You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/22 01:12:56 UTC

incubator-kylin git commit: KYLIN-942 more to go, need a generic IMeasure interface

Repository: incubator-kylin
Updated Branches:
  refs/heads/KYLIN-942-review ba484d9eb -> d55da4073


KYLIN-942 more to go, need a generic IMeasure interface


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/d55da407
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/d55da407
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/d55da407

Branch: refs/heads/KYLIN-942-review
Commit: d55da40732f9e41ce35dfc37dc3324e870c217fd
Parents: ba484d9
Author: Yang Li <li...@apache.org>
Authored: Sun Nov 22 08:11:50 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Nov 22 08:11:50 2015 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/cube/model/CubeDesc.java   |   7 -
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 129 ++++++++---------
 .../engine/mr/steps/MergeCuboidMapper.java      | 144 +++++++++----------
 .../storage/hbase/cube/v1/CubeStorageQuery.java |   6 +-
 4 files changed, 131 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d55da407/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2fd560b..ef563ed 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -823,11 +823,4 @@ public class CubeDesc extends RootPersistentEntity {
         return result;
     }
 
-    public boolean hasMeasureUsingDictionary() {
-        for (MeasureDesc measureDesc : this.getMeasures()) {
-            if (measureDesc.getColumnsNeedDictionary().size() > 0)
-                return true;
-        }
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d55da407/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 286ff02..fc616fa 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -23,9 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
@@ -51,7 +48,6 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -59,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * @author shaoshi
@@ -83,29 +80,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
 
+    private List<MeasureDesc> measureDescs;
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
     private MeasureCodec codec;
     private ByteArrayWritable outputValue = new ByteArrayWritable();
 
-    private List<MeasureDesc> measuresDescs;
-    private Integer[] measureIdxUsingDict;
-    
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
+    private List<Integer> topNMeasureIdx;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -130,17 +112,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
-        measuresDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(cubeDesc.getMeasures());
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            List<Integer> measuresUsingDict = Lists.newArrayList();
-            for (int i = 0; i < measuresDescs.size(); i++) {
-                if (measuresDescs.get(i).getFunction().isTopN()) {
-                    // so far only TopN uses dic
-                    measuresUsingDict.add(i);
-                }
+        measureDescs = cubeDesc.getMeasures();
+        codec = new MeasureCodec(measureDescs);
+
+        topNMeasureIdx = Lists.newArrayList();
+        for (int i = 0; i < measureDescs.size(); i++) {
+            if (measureDescs.get(i).getFunction().isTopN()) {
+                topNMeasureIdx.add(i);
             }
-            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
         }
     }
 
@@ -213,11 +192,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // encode measure if it uses dictionary 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            reEncodeMeasure(value);
-        } 
-        
+        // re-encode measures if dictionary is used
+        reEncodeMeasure(value);
+
         valueBuf.clear();
         codec.encode(value, valueBuf);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -225,46 +202,60 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         context.write(outputKey, outputValue);
     }
 
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dimensionsNeedDict.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dimensionsNeedDict.put(col, ret);
+            return ret;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
+        // currently only topN uses dictionary in measure obj
+        if (topNMeasureIdx.isEmpty())
+            return;
+
         int bufOffset = 0;
-        for (int idx : measureIdxUsingDict) {
-            // only TopN measure uses dic
+        for (int idx : topNMeasureIdx) {
             TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
 
-            MeasureDesc measureDesc = measuresDescs.get(idx);
-            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-            if (StringUtils.isNotEmpty(displayCol)) {
-                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
-                TblColRef colRef = new TblColRef(sourceColumn);
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+            MeasureDesc measureDesc = measureDescs.get(idx);
+            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+            int topNSize = topNCounters.size();
+            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                byte[] oldBuf = newKeyBodyBuf;
+                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            }
 
-                int topNSize = topNCounters.size();
-                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            for (Counter<ByteArray> c : topNCounters) {
+                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                int idInMergedDict;
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
 
-                for (Counter<ByteArray> c : topNCounters) {
-                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                    int idInMergedDict;
-                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                    if (size < 0) {
-                        idInMergedDict = mergedDict.nullId();
-                    } else {
-                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                    }
-
-                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                bufOffset += mergedDict.getSizeOfId();
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d55da407/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 68d1481..6c2679e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -18,8 +18,13 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
@@ -43,17 +48,11 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
 
 /**
  * @author ysong1, honma
@@ -76,29 +75,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-    private List<MeasureDesc> measuresDescs;
+    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
+
+    // for re-encode measures that use dictionary
+    private List<Integer> topNMeasureIdx;
+    private List<MeasureDesc> measureDescs;
     private MeasureCodec codec;
     private Object[] measureObjs;
-    private Integer[] measureIdxUsingDict;
     private ByteBuffer valueBuf;
     private Text outputValue;
 
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -124,20 +110,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            measuresDescs = cubeDesc.getMeasures();
-            codec = new MeasureCodec(measuresDescs);
-            measureObjs = new Object[measuresDescs.size()];
-            List<Integer> measuresUsingDict = Lists.newArrayList();
-            for (int i = 0; i < measuresDescs.size(); i++) {
-                if (measuresDescs.get(i).getFunction().isTopN()) {
-                    // so far only TopN uses dic
-                    measuresUsingDict.add(i);
-                }
+        measureDescs = cubeDesc.getMeasures();
+        codec = new MeasureCodec(measureDescs);
+        measureObjs = new Object[measureDescs.size()];
+        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        outputValue = new Text();
+        topNMeasureIdx = Lists.newArrayList();
+        for (int i = 0; i < measureDescs.size(); i++) {
+            if (measureDescs.get(i).getFunction().isTopN()) {
+                topNMeasureIdx.add(i);
             }
-            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
-            valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            outputValue = new Text();
         }
     }
 
@@ -231,60 +213,70 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // encode measure if it uses dictionary 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
+        // re-encode measures if dictionary is used
+        if (topNMeasureIdx.size() > 0) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
             reEncodeMeasure(measureObjs);
             valueBuf.clear();
             codec.encode(measureObjs, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
             value = outputValue;
-        } 
-            
+        }
+
         context.write(outputKey, value);
     }
 
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dimensionsNeedDict.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dimensionsNeedDict.put(col, ret);
+            return ret;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
         int bufOffset = 0;
-        for (int idx : measureIdxUsingDict) {
+        for (int idx : topNMeasureIdx) {
             // only TopN measure uses dic
             TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
 
-            MeasureDesc measureDesc = measuresDescs.get(idx);
-            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-            if (StringUtils.isNotEmpty(displayCol)) {
-                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
-                TblColRef colRef = new TblColRef(sourceColumn);
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+            MeasureDesc measureDesc = measureDescs.get(idx);
+            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+            int topNSize = topNCounters.size();
+            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                byte[] oldBuf = newKeyBodyBuf;
+                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            }
 
-                int topNSize = topNCounters.size();
-                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            for (Counter<ByteArray> c : topNCounters) {
+                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                int idInMergedDict;
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
 
-                for (Counter<ByteArray> c : topNCounters) {
-                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                    int idInMergedDict;
-                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                    if (size < 0) {
-                        idInMergedDict = mergedDict.nullId();
-                    } else {
-                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                    }
-
-                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                bufOffset += mergedDict.getSizeOfId();
             }
         }
-
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/d55da407/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index cf4fd46..8379572 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -144,15 +144,15 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         // check involved measures, build value decoder for each each family:column
         List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
 
-        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
-        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+        // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+        // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
         HConnection conn = HBaseConnection.get(context.getConnUrl());
 
+        // notice we're passing filterD down to storage instead of flatFilter
         return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
-        //Notice we're passing filterD down to storage instead of flatFilter
     }
 
     @Override