You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/23 10:25:04 UTC

[1/2] incubator-kylin git commit: KYLIN-942 Code review

Repository: incubator-kylin
Updated Branches:
  refs/heads/2.x-staging b52c1bba8 -> e62e0b3ef


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
index 4ced852..0c8c3bd 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v2/CubeStorageQuery.java
@@ -37,7 +37,6 @@ import com.google.common.collect.Lists;
 import com.google.common.collect.Range;
 import com.google.common.collect.Sets;
 
-@SuppressWarnings("unused")
 public class CubeStorageQuery implements ICachableStorageQuery {
 
     private static final Logger logger = LoggerFactory.getLogger(CubeStorageQuery.class);
@@ -46,30 +45,18 @@ public class CubeStorageQuery implements ICachableStorageQuery {
 
     private final CubeInstance cubeInstance;
     private final CubeDesc cubeDesc;
-    private Collection<TblColRef> topNColumns;
 
     public CubeStorageQuery(CubeInstance cube) {
         this.cubeInstance = cube;
         this.cubeDesc = cube.getDescriptor();
-        this.topNColumns = Lists.newArrayList();
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isTopN()) {
-                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
-                topNColumns.add(colRefs.get(colRefs.size() - 1));
-            }
-        }
     }
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        // check whether this is a TopN query;
-        checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+        // check whether this is a TopN query
+        checkAndRewriteTopN(sqlDigest);
 
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TblColRef topNCol = extractTopNCol(groups);
-        if (topNCol != null)
-            groups.remove(topNCol);
-
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
@@ -125,10 +112,22 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         if (scanners.isEmpty())
             return ITupleIterator.EMPTY_TUPLE_ITERATOR;
 
+        return newSequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+    }
+
+    private ITupleIterator newSequentialCubeTupleIterator(List<CubeSegmentScanner> scanners, Cuboid cuboid, Set<TblColRef> dimensionsD, Set<FunctionDesc> metrics, TupleInfo returnTupleInfo, StorageContext context) {
+        TblColRef topNCol = null;
+        for (FunctionDesc func : metrics) {
+            if (func.isTopN()) {
+                topNCol = func.getTopNLiteralColumn();
+                break;
+            }
+        }
+
         if (topNCol != null)
             return new SequentialCubeTopNTupleIterator(scanners, cuboid, dimensionsD, topNCol, metrics, returnTupleInfo, context);
-        
-        return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
+        else
+            return new SequentialCubeTupleIterator(scanners, cuboid, dimensionsD, metrics, returnTupleInfo, context);
     }
 
     private void buildDimensionsAndMetrics(SQLDigest sqlDigest, Collection<TblColRef> dimensions, Collection<FunctionDesc> metrics) {
@@ -144,11 +143,7 @@ public class CubeStorageQuery implements ICachableStorageQuery {
             if (sqlDigest.metricColumns.contains(column)) {
                 continue;
             }
-            
-            // skip topN display col
-            if (topNColumns.contains(column)) {
-                continue;
-            }
+
             dimensions.add(column);
         }
     }
@@ -398,48 +393,33 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         return false;
     }
 
+    private void checkAndRewriteTopN(SQLDigest sqlDigest) {
+        FunctionDesc topnFunc = null;
+        TblColRef topnLiteralCol = null;
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            FunctionDesc func = measure.getFunction();
+            if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) {
+                topnFunc = func;
+                topnLiteralCol = func.getTopNLiteralColumn();
+            }
+        }
 
-    private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TblColRef topNDisplayCol = extractTopNCol(groups);
-        boolean hasTopN = topNDisplayCol != null;
-
-        if (hasTopN == false)
+        // if TopN is not involved
+        if (topnFunc == null)
             return;
 
         if (sqlDigest.aggregations.size() != 1) {
             throw new IllegalStateException("When query with topN, only one metrics is allowed.");
         }
 
-        FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
-        if (functionDesc.isSum() == false) {
+        FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
+        if (origFunc.isSum() == false) {
             throw new IllegalStateException("When query with topN, only SUM function is allowed.");
         }
 
-        FunctionDesc rewriteFunction = null;
-        // replace the SUM to the TopN function
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
-                rewriteFunction = measureDesc.getFunction();
-                break;
-            }
-        }
-
-        if (rewriteFunction == null) {
-            throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
-        }
-
-        sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
-        logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
-    }
-
-    private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
-        for (TblColRef colRef : colRefs) {
-            if (topNColumns.contains(colRef)) {
-                return colRef;
-            }
-        }
-
-        return null;
+        sqlDigest.aggregations = Lists.newArrayList(topnFunc);
+        sqlDigest.groupbyColumns.remove(topnLiteralCol);
+        sqlDigest.metricColumns.add(topnLiteralCol);
+        logger.info("Rewrite function " + origFunc + " to " + topnFunc);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
index 99db123..2fd0b4f 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/ii/coprocessor/endpoint/EndpointTupleIterator.java
@@ -29,7 +29,6 @@ import java.util.Map;
 
 import javax.annotation.Nullable;
 
-import com.google.protobuf.ByteString;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.hadoop.hbase.client.HConnection;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
index 7c1a19f..50c2fac 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/RowValueDecoder.java
@@ -140,4 +140,16 @@ public class RowValueDecoder implements Cloneable {
         return false;
     }
 
+    public static MeasureDesc findTopN(Collection<RowValueDecoder> rowValueDecoders) {
+        for (RowValueDecoder decoder : rowValueDecoders) {
+            for (int i = decoder.projectionIndex.nextSetBit(0); i >= 0; i = decoder.projectionIndex.nextSetBit(i + 1)) {
+                MeasureDesc measure = decoder.measures[i];
+                FunctionDesc func = measure.getFunction();
+                if (func.isTopN())
+                    return measure;
+            }
+        }
+        return null;
+    }
+
 }


[2/2] incubator-kylin git commit: KYLIN-942 Code review

Posted by li...@apache.org.
KYLIN-942 Code review


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/e62e0b3e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/e62e0b3e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/e62e0b3e

Branch: refs/heads/2.x-staging
Commit: e62e0b3ef19622c35b70840d652baf49ce97e1ac
Parents: b52c1bb
Author: Li, Yang <ya...@ebay.com>
Authored: Fri Nov 20 11:24:47 2015 +0800
Committer: Li, Yang <ya...@ebay.com>
Committed: Mon Nov 23 17:23:50 2015 +0800

----------------------------------------------------------------------
 .../kylin/cube/CubeCapabilityChecker.java       |  10 +-
 .../cube/inmemcubing/InMemCubeBuilder.java      |  15 +-
 .../InMemCubeBuilderInputConverter.java         |  14 +-
 .../cube/inmemcubing/InMemCubeBuilderUtils.java |  10 +-
 .../org/apache/kylin/cube/model/CubeDesc.java   |  55 +------
 .../model/validation/rule/FunctionRule.java     |   2 +-
 .../cube/inmemcubing/InMemCubeBuilderTest.java  |  12 +-
 .../apache/kylin/metadata/model/ColumnDesc.java |   1 +
 .../kylin/metadata/model/DatabaseDesc.java      |   1 +
 .../kylin/metadata/model/FunctionDesc.java      |  58 ++++++--
 .../kylin/metadata/model/MeasureDesc.java       |  12 ++
 .../kylin/metadata/model/ParameterDesc.java     |  41 ++---
 .../kylin/storage/cache/StorageMockUtils.java   |   2 -
 .../engine/mr/steps/BaseCuboidMapperBase.java   |  24 +--
 .../mr/steps/MergeCuboidFromStorageMapper.java  | 129 ++++++++--------
 .../engine/mr/steps/MergeCuboidMapper.java      | 144 +++++++++---------
 .../localmeta/cube/test_kylin_cube_topn.json    |  10 --
 .../cube/test_kylin_cube_topn_left_join.json    |  10 --
 .../cube_desc/test_kylin_cube_topn_desc.json    | 148 ------------------
 .../test_kylin_cube_topn_left_join_desc.json    | 149 -------------------
 .../test_kylin_cube_without_slr_desc.json       |  11 +-
 ...t_kylin_cube_without_slr_left_join_desc.json |  11 +-
 .../kylin/invertedindex/model/IIDesc.java       |  14 +-
 .../invertedindex/model/IIKeyValueCodec.java    |   1 -
 .../kylin/query/relnode/OLAPAggregateRel.java   |   2 +-
 .../storage/hbase/cube/v1/CubeStorageQuery.java |  82 ++++------
 .../cube/v1/SerializedHBaseTupleIterator.java   |  22 ++-
 .../storage/hbase/cube/v2/CubeStorageQuery.java |  90 +++++------
 .../endpoint/EndpointTupleIterator.java         |   1 -
 .../storage/hbase/steps/RowValueDecoder.java    |  12 ++
 30 files changed, 369 insertions(+), 724 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
index 3bb246a..343bf11 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeCapabilityChecker.java
@@ -132,15 +132,15 @@ public class CubeCapabilityChecker {
         for (MeasureDesc measure : cubeDesc.getMeasures()) {
             if (measure.getFunction().isTopN()) {
                 List<TblColRef> cols = measure.getFunction().getParameter().getColRefs();
-                TblColRef displayCol = cols.get(cols.size() - 1);
-                if (digest.groupbyColumns.contains(displayCol)) {
-                    dimensionColumnsCopy.remove(displayCol);
+                TblColRef literalCol = cols.get(cols.size() - 1);
+                if (digest.groupbyColumns.contains(literalCol)) {
+                    dimensionColumnsCopy.remove(literalCol);
                     if (isMatchedWithDimensions(dimensionColumnsCopy, cube)) {
-                        if (measure.getFunction().isCompatible(onlyFunction)) {
+                        if (measure.getFunction().isTopNCompatibleSum(onlyFunction)) {
                             return true;
                         }
                     }
-                    dimensionColumnsCopy.add(displayCol);
+                    dimensionColumnsCopy.add(literalCol);
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index a179d70..e9d940a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -17,7 +17,12 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -25,7 +30,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;
@@ -43,8 +47,6 @@ import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
 import org.apache.kylin.metadata.measure.DoubleMutable;
-import org.apache.kylin.metadata.measure.LongMutable;
-import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -59,14 +61,12 @@ import com.google.common.collect.Lists;
 public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final LongMutable ONE = new LongMutable(1l);
     static final double BASE_CUBOID_CACHE_OVERSIZE_FACTOR = 0.1;
 
     private final CuboidScheduler cuboidScheduler;
     private final long baseCuboidId;
     private final int totalCuboidCount;
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
-    private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
     private final MeasureDesc[] measureDescs;
     private final int measureCount;
@@ -87,7 +87,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         this.totalCuboidCount = cuboidScheduler.getCuboidCount();
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
-        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
 
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
@@ -510,7 +509,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             this.input = input;
             this.record = new GTRecord(info);
             this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, 
-                    InMemCubeBuilderUtils.createTopNDisplayColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), 
+                    InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), 
                     info);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index d9099ce..69a9fc9 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -47,18 +47,18 @@ public class InMemCubeBuilderInputConverter {
     private final MeasureCodec measureCodec;
     private final int measureCount;
     private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private final Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+    private final Map<Integer, Dictionary<String>> topNLiteralColDictMap;
     private final GTInfo gtInfo;
     
 
-    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNDisplayColDictMap, GTInfo gtInfo) {
+    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) {
         this.cubeDesc = cubeDesc;
         this.gtInfo = gtInfo;
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
         this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        this.topNDisplayColDictMap = Preconditions.checkNotNull(topNDisplayColDictMap, "topNDisplayColDictMap cannot be null");
+        this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null");
     }
     
     public final GTRecord convert(List<String> row) {
@@ -102,13 +102,13 @@ public class InMemCubeBuilderInputConverter {
             } else if (function.isTopN()) {
                 // encode the key column with dict, and get the counter column;
                 int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
-                Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
-                int keyColEncoded = displayColDict.getIdFromValue(row.get(keyColIndex));
+                Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
+                int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex));
                 valueBuf.clear();
-                valueBuf.putInt(displayColDict.getSizeOfId());
+                valueBuf.putInt(literalColDict.getSizeOfId());
                 valueBuf.putInt(keyColEncoded);
                 if (flatTableIdx.length == 1) {
-                    // only displayCol, use 1.0 as counter
+                    // only literalCol, use 1.0 as counter
                     valueBuf.putDouble(1.0);
                 } else {
                     // get the counter column value

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
index f0ee372..9d819a4 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -35,19 +35,19 @@ import java.util.Map;
  */
 public final class InMemCubeBuilderUtils {
     
-    public static final HashMap<Integer, Dictionary<String>> createTopNDisplayColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+    public static final HashMap<Integer, Dictionary<String>> createTopNLiteralColDictionaryMap(CubeDesc cubeDesc, CubeJoinedFlatTableDesc intermediateTableDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
         HashMap<Integer, Dictionary<String>> result = Maps.newHashMap();
         for (int measureIdx = 0; measureIdx < cubeDesc.getMeasures().size(); measureIdx++) {
             MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
             FunctionDesc func = measureDesc.getFunction();
             if (func.isTopN()) {
                 int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
+                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef literalCol = func.getTopNLiteralColumn();
                 @SuppressWarnings("unchecked")
-                Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(displayCol);
+                Dictionary<String> dictionary = (Dictionary<String>) dictionaryMap.get(literalCol);
                 //Preconditions.checkNotNull(dictionary);//FIXME disable check since dictionary is null when building empty segment
-                result.put(displayColIdx, dictionary);
+                result.put(literalColIdx, dictionary);
             }
         }
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 2250945..ef563ed 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -50,7 +50,6 @@ import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.JoinDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.ParameterDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -130,7 +129,6 @@ public class CubeDesc extends RootPersistentEntity {
     private LinkedHashSet<TblColRef> allColumns = new LinkedHashSet<TblColRef>();
     private LinkedHashSet<TblColRef> dimensionColumns = new LinkedHashSet<TblColRef>();
 
-    private LinkedHashSet<TblColRef> measureDisplayColumns = new LinkedHashSet<TblColRef>();
     private Map<TblColRef, DeriveInfo> derivedToHostMap = Maps.newHashMap();
     private Map<Array<TblColRef>, List<DeriveInfo>> hostToDerivedMap = Maps.newHashMap();
 
@@ -414,7 +412,7 @@ public class CubeDesc extends RootPersistentEntity {
         }
         return calculateSignature().equals(getSignature());
     }
-    
+
     public String calculateSignature() {
         MessageDigest md = null;
         try {
@@ -646,37 +644,12 @@ public class CubeDesc extends RootPersistentEntity {
                 m.setDependentMeasureRef(m.getDependentMeasureRef().toUpperCase());
             }
 
-            FunctionDesc f = m.getFunction();
-            f.setExpression(f.getExpression().toUpperCase());
-            f.initReturnDataType();
-
-            ParameterDesc p = f.getParameter();
-            p.normalizeColumnValue();
-
-            ArrayList<TblColRef> colRefs = Lists.newArrayList();
-            if (p.isColumnType()) {
-                for (String cName : p.getValue().split("\\s*,\\s*")) {
-                    ColumnDesc sourceColumn = factTable.findColumnByName(cName);
-                    TblColRef colRef = new TblColRef(sourceColumn);
-                    colRefs.add(colRef);
-                    allColumns.add(colRef);
-                }
-            }
-
-            // for topN
-            if (StringUtils.isNotEmpty(p.getDisplayColumn())) {
-                ColumnDesc sourceColumn = factTable.findColumnByName(p.getDisplayColumn());
-                TblColRef colRef = new TblColRef(sourceColumn);
-                colRefs.add(colRef);
-                measureDisplayColumns.add(colRef);
-                allColumns.add(colRef);
-            }
-
-            if (colRefs.isEmpty() == false)
-                p.setColRefs(colRefs);
+            FunctionDesc func = m.getFunction();
+            func.init(factTable);
+            allColumns.addAll(func.getParameter().getColRefs());
 
             // verify holistic count distinct as a dependent measure
-            if (m.getFunction().isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
+            if (func.isHolisticCountDistinct() && StringUtils.isBlank(m.getDependentMeasureRef())) {
                 throw new IllegalStateException(m + " is a holistic count distinct but it has no DependentMeasureRef defined!");
             }
         }
@@ -844,24 +817,10 @@ public class CubeDesc extends RootPersistentEntity {
             }
         }
 
-        for (TblColRef colRef : measureDisplayColumns) {
-            if (!result.contains(colRef))
-                result.add(colRef);
+        for (MeasureDesc measure : measures) {
+            result.addAll(measure.getColumnsNeedDictionary());
         }
         return result;
     }
 
-    public LinkedHashSet<TblColRef> getMeasureDisplayColumns() {
-        return measureDisplayColumns;
-    }
-
-
-    public boolean hasMeasureUsingDictionary() {
-        for (MeasureDesc measureDesc : this.getMeasures()) {
-            if (measureDesc.getFunction().isTopN())
-                return true;
-        }
-
-        return false;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 80bd2f7..1920fc7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -91,7 +91,7 @@ public class FunctionRule implements IValidatorRule<CubeDesc> {
 
             if (StringUtils.equalsIgnoreCase(FunctionDesc.PARAMETER_TYPE_COLUMN, type)) {
                 validateColumnParameter(context, cube, value);
-            } else if (StringUtils.equals(FunctionDesc.PARAMTER_TYPE_CONSTANT, type)) {
+            } else if (StringUtils.equals(FunctionDesc.PARAMETER_TYPE_CONSTANT, type)) {
                 validateCostantParameter(context, cube, value);
             }
             validateReturnType(context, cube, func);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
index e3fb30e..f853b08 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderTest.java
@@ -188,13 +188,13 @@ public class InMemCubeBuilderTest extends LocalFileMetadataTestCase {
             FunctionDesc func = measureDesc.getFunction();
             if (func.isTopN()) {
                 int[] flatTableIdx = flatTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
-                logger.info("Building dictionary for " + displayCol);
-                List<byte[]> valueList = readValueList(flatTable, nColumns, displayColIdx);
-                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(displayCol.getType(), new IterableDictionaryValueEnumerator(valueList));
+                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef literalCol = func.getTopNLiteralColumn();
+                logger.info("Building dictionary for " + literalCol);
+                List<byte[]> valueList = readValueList(flatTable, nColumns, literalColIdx);
+                Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(literalCol.getType(), new IterableDictionaryValueEnumerator(valueList));
 
-                result.put(displayCol, dict);
+                result.put(literalCol, dict);
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
index 12371ce..6162477 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ColumnDesc.java
@@ -30,6 +30,7 @@ import java.io.Serializable;
  * Column Metadata from Source. All name should be uppercase.
  * <p/>
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class ColumnDesc implements Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
index 215e86c..6b8447d 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/DatabaseDesc.java
@@ -27,6 +27,7 @@ import java.util.Set;
 /**
  * @author xjiang
  */
+@SuppressWarnings("serial")
 public class DatabaseDesc implements Serializable {
     private String name;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index d10f395..b8cefa2 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -18,11 +18,13 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.util.ArrayList;
 import java.util.Collection;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Lists;
 
 /**
  */
@@ -36,9 +38,9 @@ public class FunctionDesc {
     public static final String FUNC_COUNT_DISTINCT = "COUNT_DISTINCT";
     public static final String FUNC_TOP_N = "TOP_N";
 
-    public static final String PARAMTER_TYPE_CONSTANT = "constant";
+    public static final String PARAMETER_TYPE_CONSTANT = "constant";
     public static final String PARAMETER_TYPE_COLUMN = "column";
-    
+
     @JsonProperty("expression")
     private String expression;
     @JsonProperty("parameter")
@@ -49,6 +51,26 @@ public class FunctionDesc {
     private DataType returnDataType;
     private boolean isDimensionAsMetric = false;
 
+    public void init(TableDesc factTable) {
+        expression = expression.toUpperCase();
+        returnDataType = DataType.getInstance(returnType);
+
+        for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+            p.setValue(p.getValue().toUpperCase());
+        }
+
+        ArrayList<TblColRef> colRefs = Lists.newArrayList();
+        for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) {
+            if (p.isColumnType()) {
+                ColumnDesc sourceColumn = factTable.findColumnByName(p.getValue());
+                TblColRef colRef = new TblColRef(sourceColumn);
+                colRefs.add(colRef);
+            }
+        }
+
+        parameter.setColRefs(colRefs);
+    }
+
     public String getRewriteFieldName() {
         if (isSum()) {
             return getParameter().getValue();
@@ -161,11 +183,6 @@ public class FunctionDesc {
 
     public void setReturnType(String returnType) {
         this.returnType = returnType;
-        this.initReturnDataType();
-    }
-
-    // Jackson does not provide object post-processing currently
-    public void initReturnDataType() {
         this.returnDataType = DataType.getInstance(returnType);
     }
 
@@ -225,13 +242,32 @@ public class FunctionDesc {
         return "FunctionDesc [expression=" + expression + ", parameter=" + parameter + ", returnType=" + returnType + "]";
     }
 
-    public boolean isCompatible(FunctionDesc another) {
-        if (another == null) {
+    // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
+    public TblColRef getTopNNumericColumn() {
+        if (isTopN() == false)
+            throw new IllegalStateException();
+
+        return parameter.getColRefs().get(0);
+    }
+
+    // cols[0] is numeric (e.g. GMV), cols[1] is literal (e.g. SELLER)
+    public TblColRef getTopNLiteralColumn() {
+        if (isTopN() == false)
+            throw new IllegalStateException();
+
+        return parameter.getColRefs().get(1);
+    }
+
+    public boolean isTopNCompatibleSum(FunctionDesc sum) {
+        if (isTopN() == false)
+            throw new IllegalStateException();
+
+        if (sum == null) {
             return false;
         }
 
-        if (this.isTopN() && another.isSum()) {
-            if (this.getParameter().getColRefs().get(0).equals(another.getParameter().getColRefs().get(0)))
+        if (this.isTopN() && sum.isSum()) {
+            if (this.getParameter().getColRefs().get(0).equals(sum.getParameter().getColRefs().get(0)))
                 return true;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
index 1561b1f..618d25a 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/MeasureDesc.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.metadata.model;
 
+import java.util.Collections;
+import java.util.List;
+
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -37,6 +40,15 @@ public class MeasureDesc {
     @JsonProperty("dependent_measure_ref")
     private String dependentMeasureRef;
 
+    public List<TblColRef> getColumnsNeedDictionary() {
+        // measure could store literal values using dictionary encoding to save space, like TopN
+        if (function.isTopN()) {
+            return Collections.singletonList(function.getTopNLiteralColumn());
+        } else {
+            return Collections.emptyList();
+        }
+    }
+
     public int getId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
----------------------------------------------------------------------
diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
index 9773b84..2cf4374 100644
--- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
+++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/ParameterDesc.java
@@ -19,11 +19,8 @@
 package org.apache.kylin.metadata.model;
 
 import java.io.UnsupportedEncodingException;
-import java.util.Arrays;
 import java.util.List;
 
-import org.apache.commons.lang.StringUtils;
-
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
 import com.fasterxml.jackson.annotation.JsonProperty;
@@ -33,15 +30,13 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class ParameterDesc {
 
-    public static final String COLUMN_TYPE = "column";
-
     @JsonProperty("type")
     private String type;
     @JsonProperty("value")
     private String value;
     
-    @JsonProperty("displaycolumn")
-    private String displayColumn;
+    @JsonProperty("next_parameter")
+    private ParameterDesc nextParameter;
 
     private List<TblColRef> colRefs;
 
@@ -65,14 +60,6 @@ public class ParameterDesc {
         this.value = value;
     }
 
-    public String getDisplayColumn() {
-        return displayColumn;
-    }
-
-    public void setDisplayColumn(String displayColumn) {
-        this.displayColumn = displayColumn;
-    }
-
     public List<TblColRef> getColRefs() {
         return colRefs;
     }
@@ -80,19 +67,17 @@ public class ParameterDesc {
     public void setColRefs(List<TblColRef> colRefs) {
         this.colRefs = colRefs;
     }
+    
+    public ParameterDesc getNextParameter() {
+        return nextParameter;
+    }
 
-    public boolean isColumnType() {
-        return COLUMN_TYPE.equals(type);
+    public void setNextParameter(ParameterDesc nextParameter) {
+        this.nextParameter = nextParameter;
     }
 
-    public void normalizeColumnValue() {
-        if (isColumnType()) {
-            String values[] = value.split("\\s*,\\s*");
-            for (int i = 0; i < values.length; i++)
-                values[i] = values[i].toUpperCase();
-            Arrays.sort(values);
-            value = StringUtils.join(values, ",");
-        }
+    public boolean isColumnType() {
+        return FunctionDesc.PARAMETER_TYPE_COLUMN.equals(type);
     }
 
     @Override
@@ -102,7 +87,7 @@ public class ParameterDesc {
 
         ParameterDesc that = (ParameterDesc) o;
 
-        if (displayColumn != null ? !displayColumn.equals(that.displayColumn) : that.displayColumn != null) return false;
+        if (nextParameter != null ? !nextParameter.equals(that.nextParameter) : that.nextParameter != null) return false;
         if (type != null ? !type.equals(that.type) : that.type != null) return false;
         if (value != null ? !value.equals(that.value) : that.value != null) return false;
 
@@ -113,13 +98,13 @@ public class ParameterDesc {
     public int hashCode() {
         int result = type != null ? type.hashCode() : 0;
         result = 31 * result + (value != null ? value.hashCode() : 0);
-        result = 31 * result + (displayColumn != null ? displayColumn.hashCode() : 0);
+        result = 31 * result + (nextParameter != null ? nextParameter.hashCode() : 0);
         return result;
     }
 
     @Override
     public String toString() {
-        return "ParameterDesc [type=" + type + ", value=" + value + ", displayColumn=" + displayColumn + "]";
+        return "ParameterDesc [type=" + type + ", value=" + value + ", nextParam=" + nextParameter + "]";
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
----------------------------------------------------------------------
diff --git a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
index 2b5ceee..2898f93 100644
--- a/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
+++ b/core-storage/src/test/java/org/apache/kylin/storage/cache/StorageMockUtils.java
@@ -117,7 +117,6 @@ public class StorageMockUtils {
         return compareFilter;
     }
 
-    @SuppressWarnings("unused")
     public static TupleFilter buildAndFilter(List<TblColRef> columns) {
         CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
         CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));
@@ -127,7 +126,6 @@ public class StorageMockUtils {
         return andFilter;
     }
 
-    @SuppressWarnings("unused")
     public static TupleFilter buildOrFilter(List<TblColRef> columns) {
         CompareTupleFilter compareFilter1 = buildFilter1(columns.get(0));
         CompareTupleFilter compareFilter2 = buildFilter2(columns.get(1));

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index ed1fd4a..44311c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -57,7 +57,7 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
     private Text outputKey = new Text();
     private Text outputValue = new Text();
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private Map<Integer, Dictionary<String>> topNDisplayColDictMap;
+    private Map<Integer, Dictionary<String>> topNLiteralColDictMap;
 
     @Override
     protected void setup(Context context) throws IOException {
@@ -92,21 +92,21 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         int colCount = cubeDesc.getRowkey().getRowKeyColumns().length;
         keyBytesBuf = new byte[colCount][];
 
-        initTopNDisplayColDictionaryMap();
+        initTopNLiteralColDictionaryMap();
         initNullBytes();
     }
     
-    private void initTopNDisplayColDictionaryMap() {
-        topNDisplayColDictMap = Maps.newHashMap();
+    private void initTopNLiteralColDictionaryMap() {
+        topNLiteralColDictMap = Maps.newHashMap();
         for (int measureIdx = 0; measureIdx < measures.length; measureIdx++) {
             MeasureDesc measureDesc = cubeDesc.getMeasures().get(measureIdx);
             FunctionDesc func = measureDesc.getFunction();
             if (func.isTopN()) {
                 int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[measureIdx];
-                int displayColIdx = flatTableIdx[flatTableIdx.length - 1];
-                TblColRef displayCol = func.getParameter().getColRefs().get(flatTableIdx.length - 1);
-                Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(displayCol);
-                topNDisplayColDictMap.put(displayColIdx, dictionary);
+                int literalColIdx = flatTableIdx[flatTableIdx.length - 1];
+                TblColRef literalCol = func.getTopNLiteralColumn();
+                Dictionary<String> dictionary = (Dictionary<String>)cubeSegment.getDictionary(literalCol);
+                topNLiteralColDictMap.put(literalColIdx, dictionary);
             }
         }
     }
@@ -174,13 +174,13 @@ public class BaseCuboidMapperBase<KEYIN, VALUEIN> extends KylinMapper<KEYIN, VAL
         else if(func.isTopN()) {
             // encode the key column with dict, and get the counter column;
             int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
-            Dictionary<String> displayColDict = topNDisplayColDictMap.get(keyColIndex);
-            int keyColEncoded = displayColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
+            Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
+            int keyColEncoded = literalColDict.getIdFromValue(Bytes.toString(splitBuffers[keyColIndex].value));
             valueBuf.clear();
-            valueBuf.putInt(displayColDict.getSizeOfId());
+            valueBuf.putInt(literalColDict.getSizeOfId());
             valueBuf.putInt(keyColEncoded);
             if (flatTableIdx.length == 1) {
-                // only displayCol, use 1.0 as counter
+                // only literalCol, use 1.0 as counter
                 valueBuf.putDouble(1.0);
             } else {
                 // get the counter column value

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
index 286ff02..fc616fa 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidFromStorageMapper.java
@@ -23,9 +23,6 @@ import java.nio.ByteBuffer;
 import java.util.HashMap;
 import java.util.List;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
@@ -51,7 +48,6 @@ import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -59,6 +55,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
 
 /**
  * @author shaoshi
@@ -83,29 +80,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
+    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
 
+    private List<MeasureDesc> measureDescs;
     private ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
     private MeasureCodec codec;
     private ByteArrayWritable outputValue = new ByteArrayWritable();
 
-    private List<MeasureDesc> measuresDescs;
-    private Integer[] measureIdxUsingDict;
-    
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
+    private List<Integer> topNMeasureIdx;
 
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
@@ -130,17 +112,14 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
-        measuresDescs = cubeDesc.getMeasures();
-        codec = new MeasureCodec(cubeDesc.getMeasures());
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            List<Integer> measuresUsingDict = Lists.newArrayList();
-            for (int i = 0; i < measuresDescs.size(); i++) {
-                if (measuresDescs.get(i).getFunction().isTopN()) {
-                    // so far only TopN uses dic
-                    measuresUsingDict.add(i);
-                }
+        measureDescs = cubeDesc.getMeasures();
+        codec = new MeasureCodec(measureDescs);
+
+        topNMeasureIdx = Lists.newArrayList();
+        for (int i = 0; i < measureDescs.size(); i++) {
+            if (measureDescs.get(i).getFunction().isTopN()) {
+                topNMeasureIdx.add(i);
             }
-            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
         }
     }
 
@@ -213,11 +192,9 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // encode measure if it uses dictionary 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            reEncodeMeasure(value);
-        } 
-        
+        // re-encode measures if dictionary is used
+        reEncodeMeasure(value);
+
         valueBuf.clear();
         codec.encode(value, valueBuf);
         outputValue.set(valueBuf.array(), 0, valueBuf.position());
@@ -225,46 +202,60 @@ public class MergeCuboidFromStorageMapper extends KylinMapper<Object, Object, By
         context.write(outputKey, outputValue);
     }
 
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dimensionsNeedDict.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dimensionsNeedDict.put(col, ret);
+            return ret;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
+        // currently only topN uses dictionary in measure obj
+        if (topNMeasureIdx.isEmpty())
+            return;
+
         int bufOffset = 0;
-        for (int idx : measureIdxUsingDict) {
-            // only TopN measure uses dic
+        for (int idx : topNMeasureIdx) {
             TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
 
-            MeasureDesc measureDesc = measuresDescs.get(idx);
-            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-            if (StringUtils.isNotEmpty(displayCol)) {
-                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
-                TblColRef colRef = new TblColRef(sourceColumn);
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+            MeasureDesc measureDesc = measureDescs.get(idx);
+            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+            int topNSize = topNCounters.size();
+            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                byte[] oldBuf = newKeyBodyBuf;
+                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            }
 
-                int topNSize = topNCounters.size();
-                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            for (Counter<ByteArray> c : topNCounters) {
+                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                int idInMergedDict;
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
 
-                for (Counter<ByteArray> c : topNCounters) {
-                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                    int idInMergedDict;
-                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                    if (size < 0) {
-                        idInMergedDict = mergedDict.nullId();
-                    } else {
-                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                    }
-
-                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                bufOffset += mergedDict.getSizeOfId();
             }
         }
-
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
----------------------------------------------------------------------
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
index 68d1481..6c2679e 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeCuboidMapper.java
@@ -18,8 +18,13 @@
 
 package org.apache.kylin.engine.mr.steps;
 
-import com.google.common.collect.Lists;
-import org.apache.commons.lang.StringUtils;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
 import org.apache.kylin.common.KylinConfig;
@@ -43,17 +48,11 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
 
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
+import com.google.common.collect.Lists;
 
 /**
  * @author ysong1, honma
@@ -76,29 +75,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
     private RowKeySplitter rowKeySplitter;
     private RowKeyEncoderProvider rowKeyEncoderProvider;
 
-    private HashMap<TblColRef, Boolean> dictsNeedMerging = new HashMap<TblColRef, Boolean>();
-    private List<MeasureDesc> measuresDescs;
+    private HashMap<TblColRef, Boolean> dimensionsNeedDict = new HashMap<TblColRef, Boolean>();
+
+    // for re-encode measures that use dictionary
+    private List<Integer> topNMeasureIdx;
+    private List<MeasureDesc> measureDescs;
     private MeasureCodec codec;
     private Object[] measureObjs;
-    private Integer[] measureIdxUsingDict;
     private ByteBuffer valueBuf;
     private Text outputValue;
 
-    private Boolean checkNeedMerging(TblColRef col) throws IOException {
-        Boolean ret = dictsNeedMerging.get(col);
-        if (ret != null)
-            return ret;
-        else {
-            ret = cubeDesc.getRowkey().isUseDictionary(col);
-            if (ret) {
-                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
-                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
-            }
-            dictsNeedMerging.put(col, ret);
-            return ret;
-        }
-    }
-
     @Override
     protected void setup(Context context) throws IOException, InterruptedException {
         super.bindCurrentConfiguration(context.getConfiguration());
@@ -124,20 +110,16 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowKeySplitter = new RowKeySplitter(sourceCubeSegment, 65, 255);
         rowKeyEncoderProvider = new RowKeyEncoderProvider(mergedCubeSegment);
 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
-            measuresDescs = cubeDesc.getMeasures();
-            codec = new MeasureCodec(measuresDescs);
-            measureObjs = new Object[measuresDescs.size()];
-            List<Integer> measuresUsingDict = Lists.newArrayList();
-            for (int i = 0; i < measuresDescs.size(); i++) {
-                if (measuresDescs.get(i).getFunction().isTopN()) {
-                    // so far only TopN uses dic
-                    measuresUsingDict.add(i);
-                }
+        measureDescs = cubeDesc.getMeasures();
+        codec = new MeasureCodec(measureDescs);
+        measureObjs = new Object[measureDescs.size()];
+        valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        outputValue = new Text();
+        topNMeasureIdx = Lists.newArrayList();
+        for (int i = 0; i < measureDescs.size(); i++) {
+            if (measureDescs.get(i).getFunction().isTopN()) {
+                topNMeasureIdx.add(i);
             }
-            measureIdxUsingDict = measuresUsingDict.toArray(new Integer[measuresUsingDict.size()]);
-            valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-            outputValue = new Text();
         }
     }
 
@@ -231,60 +213,70 @@ public class MergeCuboidMapper extends KylinMapper<Text, Text, Text, Text> {
         rowkeyEncoder.encode(new ByteArray(newKeyBodyBuf, 0, bufOffset), newKeyBuf);
         outputKey.set(newKeyBuf.array(), 0, fullKeySize);
 
-        // encode measure if it uses dictionary 
-        if (cubeDesc.hasMeasureUsingDictionary()) {
+        // re-encode measures if dictionary is used
+        if (topNMeasureIdx.size() > 0) {
             codec.decode(ByteBuffer.wrap(value.getBytes(), 0, value.getLength()), measureObjs);
             reEncodeMeasure(measureObjs);
             valueBuf.clear();
             codec.encode(measureObjs, valueBuf);
             outputValue.set(valueBuf.array(), 0, valueBuf.position());
             value = outputValue;
-        } 
-            
+        }
+
         context.write(outputKey, value);
     }
 
+    private Boolean checkNeedMerging(TblColRef col) throws IOException {
+        Boolean ret = dimensionsNeedDict.get(col);
+        if (ret != null)
+            return ret;
+        else {
+            ret = cubeDesc.getRowkey().isUseDictionary(col);
+            if (ret) {
+                String dictTable = DictionaryManager.getInstance(config).decideSourceData(cubeDesc.getModel(), cubeDesc.getRowkey().getDictionary(col), col).getTable();
+                ret = cubeDesc.getFactTable().equalsIgnoreCase(dictTable);
+            }
+            dimensionsNeedDict.put(col, ret);
+            return ret;
+        }
+    }
+
+    @SuppressWarnings("unchecked")
     private void reEncodeMeasure(Object[] measureObjs) throws IOException, InterruptedException {
         int bufOffset = 0;
-        for (int idx : measureIdxUsingDict) {
+        for (int idx : topNMeasureIdx) {
             // only TopN measure uses dic
             TopNCounter<ByteArray> topNCounters = (TopNCounter<ByteArray>) measureObjs[idx];
 
-            MeasureDesc measureDesc = measuresDescs.get(idx);
-            String displayCol = measureDesc.getFunction().getParameter().getDisplayColumn().toUpperCase();
-            if (StringUtils.isNotEmpty(displayCol)) {
-                ColumnDesc sourceColumn = cubeDesc.getFactTableDesc().findColumnByName(displayCol);
-                TblColRef colRef = new TblColRef(sourceColumn);
-                DictionaryManager dictMgr = DictionaryManager.getInstance(config);
-                Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
-                Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+            MeasureDesc measureDesc = measureDescs.get(idx);
+            TblColRef colRef = measureDesc.getFunction().getTopNLiteralColumn();
+            DictionaryManager dictMgr = DictionaryManager.getInstance(config);
+            Dictionary<?> sourceDict = dictMgr.getDictionary(sourceCubeSegment.getDictResPath(colRef));
+            Dictionary<?> mergedDict = dictMgr.getDictionary(mergedCubeSegment.getDictResPath(colRef));
+
+            int topNSize = topNCounters.size();
+            while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
+                    mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
+                byte[] oldBuf = newKeyBodyBuf;
+                newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
+                System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            }
 
-                int topNSize = topNCounters.size();
-                while (sourceDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfValue() * topNSize > newKeyBodyBuf.length - bufOffset || //
-                        mergedDict.getSizeOfId() * topNSize > newKeyBodyBuf.length - bufOffset) {
-                    byte[] oldBuf = newKeyBodyBuf;
-                    newKeyBodyBuf = new byte[2 * newKeyBodyBuf.length];
-                    System.arraycopy(oldBuf, 0, newKeyBodyBuf, 0, oldBuf.length);
+            for (Counter<ByteArray> c : topNCounters) {
+                int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
+                int idInMergedDict;
+                int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
+                if (size < 0) {
+                    idInMergedDict = mergedDict.nullId();
+                } else {
+                    idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
                 }
 
-                for (Counter<ByteArray> c : topNCounters) {
-                    int idInSourceDict = BytesUtil.readUnsigned(c.getItem().array(), c.getItem().offset(), c.getItem().length());
-                    int idInMergedDict;
-                    int size = sourceDict.getValueBytesFromId(idInSourceDict, newKeyBodyBuf, bufOffset);
-                    if (size < 0) {
-                        idInMergedDict = mergedDict.nullId();
-                    } else {
-                        idInMergedDict = mergedDict.getIdFromValueBytes(newKeyBodyBuf, bufOffset, size);
-                    }
-
-                    BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
-                    bufOffset += mergedDict.getSizeOfId();
-                }
+                BytesUtil.writeUnsigned(idInMergedDict, newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                c.getItem().set(newKeyBodyBuf, bufOffset, mergedDict.getSizeOfId());
+                bufOffset += mergedDict.getSizeOfId();
             }
         }
-
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
deleted file mode 100644
index 903fc15..0000000
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
-  "uuid" : "33354455-a33e-4b69-83dd-0bb8b1f8c53b",
-  "last_modified" : 0,
-  "name" : "test_kylin_cube_topn",
-  "owner" : null,
-  "version" : null,
-  "descriptor" : "test_kylin_cube_topn_desc",
-  "segments" : [ ],
-  "create_time" : null
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json b/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
deleted file mode 100644
index 6f57561..0000000
--- a/examples/test_case_data/localmeta/cube/test_kylin_cube_topn_left_join.json
+++ /dev/null
@@ -1,10 +0,0 @@
-{
-  "uuid" : "44454455-a33e-4b69-83dd-0bb8b1f8c53b",
-  "last_modified" : 0,
-  "name" : "test_kylin_cube_topn_left_join",
-  "owner" : null,
-  "version" : null,
-  "descriptor" : "test_kylin_cube_topn_left_join_desc",
-  "segments" : [ ],
-  "create_time" : null
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
deleted file mode 100644
index fddbb10..0000000
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_desc.json
+++ /dev/null
@@ -1,148 +0,0 @@
-{
-  "uuid": "4334a905-1fc6-4f67-985c-38fa5aeafd92",
-  "name": "test_kylin_cube_topn_desc",
-  "description": null,
-  "dimensions": [
-    {
-      "id": 0,
-      "name": "CAL_DT",
-      "table": "EDW.TEST_CAL_DT",
-      "column": null,
-      "derived": [
-        "WEEK_BEG_DT"
-      ],
-      "hierarchy": false
-    }
-  ],
-  "measures": [
-    {
-      "id": 1,
-      "name": "GMV_SUM",
-      "function": {
-        "expression": "SUM",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 2,
-      "name": "GMV_MIN",
-      "function": {
-        "expression": "MIN",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 3,
-      "name": "GMV_MAX",
-      "function": {
-        "expression": "MAX",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 4,
-      "name": "TRANS_CNT",
-      "function": {
-        "expression": "COUNT",
-        "parameter": {
-          "type": "constant",
-          "value": "1"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 5,
-      "name": "ITEM_COUNT_SUM",
-      "function": {
-        "expression": "SUM",
-        "parameter": {
-          "type": "column",
-          "value": "ITEM_COUNT"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 6,
-      "name": "TOP_SELLER",
-      "function": {
-        "expression": "TOP_N",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE",
-          "displaycolumn": "SELLER_ID"
-        },
-        "returntype": "topn(100)"
-      },
-      "dependent_measure_ref": null
-    }
-  ],
-  "rowkey": {
-    "rowkey_columns": [
-      {
-        "column": "cal_dt",
-        "length": 0,
-        "dictionary": "true",
-        "mandatory": false
-      }
-    ],
-    "aggregation_groups": [
-      [
-        "cal_dt"
-      ]
-    ]
-  },
-  "last_modified": 1422435345330,
-  "model_name": "test_kylin_inner_join_model_desc",
-  "null_string": null,
-  "hbase_mapping": {
-    "column_family": [
-      {
-        "name": "f1",
-        "columns": [
-          {
-            "qualifier": "m",
-            "measure_refs": [
-              "gmv_sum",
-              "gmv_min",
-              "gmv_max",
-              "trans_cnt",
-              "item_count_sum"
-            ]
-          }
-        ]
-      },  {
-        "name": "f2",
-        "columns": [
-          {
-            "qualifier": "m",
-            "measure_refs": [
-              "top_seller"
-            ]
-          }
-        ]
-      }
-    ]
-  },
-  "notify_list": null,
-  "engine_type": 2,
-  "storage_type": 2
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
deleted file mode 100644
index 6aecaae..0000000
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_topn_left_join_desc.json
+++ /dev/null
@@ -1,149 +0,0 @@
-{
-  "uuid": "5445a905-1fc6-4f67-985c-38fa5aeafd92",
-  "name": "test_kylin_cube_topn_left_join_desc",
-  "description": null,
-  "dimensions": [
-    {
-      "id": 0,
-      "name": "CAL_DT",
-      "table": "EDW.TEST_CAL_DT",
-      "column": null,
-      "derived": [
-        "WEEK_BEG_DT"
-      ],
-      "hierarchy": false
-    }
-  ],
-  "measures": [
-    {
-      "id": 1,
-      "name": "GMV_SUM",
-      "function": {
-        "expression": "SUM",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 2,
-      "name": "GMV_MIN",
-      "function": {
-        "expression": "MIN",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 3,
-      "name": "GMV_MAX",
-      "function": {
-        "expression": "MAX",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE"
-        },
-        "returntype": "decimal(19,4)"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 4,
-      "name": "TRANS_CNT",
-      "function": {
-        "expression": "COUNT",
-        "parameter": {
-          "type": "constant",
-          "value": "1"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 5,
-      "name": "ITEM_COUNT_SUM",
-      "function": {
-        "expression": "SUM",
-        "parameter": {
-          "type": "column",
-          "value": "ITEM_COUNT"
-        },
-        "returntype": "bigint"
-      },
-      "dependent_measure_ref": null
-    },
-    {
-      "id": 6,
-      "name": "TOP_SELLER",
-      "function": {
-        "expression": "TOP_N",
-        "parameter": {
-          "type": "column",
-          "value": "PRICE",
-          "displaycolumn": "SELLER_ID"
-        },
-        "returntype": "topn(100)"
-      },
-      "dependent_measure_ref": null
-    }
-  ],
-  "rowkey": {
-    "rowkey_columns": [
-      {
-        "column": "cal_dt",
-        "length": 0,
-        "dictionary": "true",
-        "mandatory": false
-      }
-    ],
-    "aggregation_groups": [
-      [
-        "cal_dt"
-      ]
-    ]
-  },
-  "last_modified": 1422435345330,
-  "model_name": "test_kylin_left_join_model_desc",
-  "null_string": null,
-  "hbase_mapping": {
-    "column_family": [
-      {
-        "name": "f1",
-        "columns": [
-          {
-            "qualifier": "m",
-            "measure_refs": [
-              "gmv_sum",
-              "gmv_min",
-              "gmv_max",
-              "trans_cnt",
-              "item_count_sum"
-            ]
-          }
-        ]
-      },
-      {
-        "name": "f2",
-        "columns": [
-          {
-            "qualifier": "m",
-            "measure_refs": [
-              "top_seller"
-            ]
-          }
-        ]
-      }
-    ]
-  },
-  "notify_list": null,
-  "engine_type": 2,
-  "storage_type": 2
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
index 3f9957b..1e007c3 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_desc.json
@@ -156,7 +156,11 @@
         "expression": "COUNT_DISTINCT",
         "parameter": {
           "type": "column",
-          "value": "LSTG_FORMAT_NAME,SELLER_ID"
+          "value": "LSTG_FORMAT_NAME",
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID"
+          }
         },
         "returntype": "hllc(10)"
       },
@@ -170,7 +174,10 @@
         "parameter": {
           "type": "column",
           "value": "PRICE",
-          "displaycolumn": "SELLER_ID"
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID"
+          }
         },
         "returntype": "topn(100)"
       },

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
----------------------------------------------------------------------
diff --git a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
index 907e338..73a58f0 100644
--- a/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
+++ b/examples/test_case_data/localmeta/cube_desc/test_kylin_cube_without_slr_left_join_desc.json
@@ -156,7 +156,11 @@
         "expression": "COUNT_DISTINCT",
         "parameter": {
           "type": "column",
-          "value": "LSTG_FORMAT_NAME,SELLER_ID"
+          "value": "LSTG_FORMAT_NAME",
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID"
+          }
         },
         "returntype": "hllc(10)"
       },
@@ -170,7 +174,10 @@
         "parameter": {
           "type": "column",
           "value": "PRICE",
-          "displaycolumn": "SELLER_ID"
+          "next_parameter": {
+            "type": "column",
+            "value": "SELLER_ID"
+          }
         },
         "returntype": "topn(100)"
       },

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
index 71737dc..452e3a3 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIDesc.java
@@ -35,7 +35,16 @@ import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.model.*;
+import org.apache.kylin.metadata.model.ColumnDesc;
+import org.apache.kylin.metadata.model.DataModelDesc;
+import org.apache.kylin.metadata.model.DimensionDesc;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.IEngineAware;
+import org.apache.kylin.metadata.model.IStorageAware;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TableDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -49,6 +58,7 @@ import com.google.common.collect.Sets;
 /**
  * @author yangli9
  */
+@SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class IIDesc extends RootPersistentEntity {
 
@@ -231,7 +241,7 @@ public class IIDesc extends RootPersistentEntity {
 
     /**
      * 
-     * @param hllType represents the presision
+     * @param hllType represents the precision
      */
     private MeasureDesc makeHLLMeasure(ColumnDesc columnDesc, String hllType) {
         String columnName = columnDesc.getName();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
----------------------------------------------------------------------
diff --git a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
index 7e54a98..e17133f 100644
--- a/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
+++ b/invertedindex/src/main/java/org/apache/kylin/invertedindex/model/IIKeyValueCodec.java
@@ -19,7 +19,6 @@
 package org.apache.kylin.invertedindex.model;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
----------------------------------------------------------------------
diff --git a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
index e950911..cbc0c56 100644
--- a/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
+++ b/query/src/main/java/org/apache/kylin/query/relnode/OLAPAggregateRel.java
@@ -210,7 +210,7 @@ public class OLAPAggregateRel extends Aggregate implements OLAPRel {
                 if (!column.isInnerColumn()) {
                     parameter = new ParameterDesc();
                     parameter.setValue(column.getName());
-                    parameter.setType("column");
+                    parameter.setType(FunctionDesc.PARAMETER_TYPE_COLUMN);
                     parameter.setColRefs(Arrays.asList(column));
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
index f84e4e6..4d34943 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/CubeStorageQuery.java
@@ -87,32 +87,20 @@ public class CubeStorageQuery implements ICachableStorageQuery {
     private final CubeInstance cubeInstance;
     private final CubeDesc cubeDesc;
     private final String uuid;
-    private Collection<TblColRef> topNColumns;
 
     public CubeStorageQuery(CubeInstance cube) {
         this.cubeInstance = cube;
         this.cubeDesc = cube.getDescriptor();
         this.uuid = cube.getUuid();
-        this.topNColumns = Lists.newArrayList();
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isTopN()) {
-                List<TblColRef> colRefs = measureDesc.getFunction().getParameter().getColRefs();
-                topNColumns.add(colRefs.get(colRefs.size() - 1));
-            }
-        }
     }
 
     @Override
     public ITupleIterator search(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
 
-        // check whether this is a TopN query;
-        checkAndRewriteTopN(context, sqlDigest, returnTupleInfo);
+        // check whether this is a TopN query
+        checkAndRewriteTopN(sqlDigest);
 
         Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TblColRef topNCol = extractTopNCol(groups);
-        if (topNCol != null)
-            groups.remove(topNCol);
-
         TupleFilter filter = sqlDigest.filter;
 
         // build dimension & metrics
@@ -156,15 +144,15 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         // check involved measures, build value decoder for each each family:column
         List<RowValueDecoder> valueDecoders = translateAggregation(cubeDesc.getHBaseMapping(), metrics, context);
 
-        //memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
-        //setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
+        // memory hungry distinct count are pushed down to coprocessor, no need to set threshold any more
+        // setThreshold(dimensionsD, valueDecoders, context); // set cautious threshold to prevent out of memory
         setCoprocessor(groupsCopD, valueDecoders, context); // enable coprocessor if beneficial
         setLimit(filter, context);
 
         HConnection conn = HBaseConnection.get(context.getConnUrl());
 
-        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, topNCol, valueDecoders, context, returnTupleInfo);
-        //Notice we're passing filterD down to storage instead of flatFilter
+        // notice we're passing filterD down to storage instead of flatFilter
+        return new SerializedHBaseTupleIterator(conn, scans, cubeInstance, dimensionsD, filterD, groupsCopD, valueDecoders, context, returnTupleInfo);
     }
 
     @Override
@@ -196,11 +184,6 @@ public class CubeStorageQuery implements ICachableStorageQuery {
                 continue;
             }
 
-            // skip topN display col
-            if (topNColumns.contains(column)) {
-                continue;
-            }
-
             dimensions.add(column);
         }
     }
@@ -767,48 +750,33 @@ public class CubeStorageQuery implements ICachableStorageQuery {
         ObserverEnabler.enableCoprocessorIfBeneficial(cubeInstance, groupsCopD, valueDecoders, context);
     }
 
-    private void checkAndRewriteTopN(StorageContext context, SQLDigest sqlDigest, TupleInfo returnTupleInfo) {
-        Collection<TblColRef> groups = sqlDigest.groupbyColumns;
-        TblColRef topNDisplayCol = extractTopNCol(groups);
-        boolean hasTopN = topNDisplayCol != null;
-
-        if (hasTopN == false)
+    private void checkAndRewriteTopN(SQLDigest sqlDigest) {
+        FunctionDesc topnFunc = null;
+        TblColRef topnLiteralCol = null;
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            FunctionDesc func = measure.getFunction();
+            if (func.isTopN() && sqlDigest.groupbyColumns.contains(func.getTopNLiteralColumn())) {
+                topnFunc = func;
+                topnLiteralCol = func.getTopNLiteralColumn();
+            }
+        }
+        
+        // if TopN is not involved
+        if (topnFunc == null)
             return;
 
         if (sqlDigest.aggregations.size() != 1) {
             throw new IllegalStateException("When query with topN, only one metrics is allowed.");
         }
 
-        FunctionDesc functionDesc = sqlDigest.aggregations.iterator().next();
-        if (functionDesc.isSum() == false) {
+        FunctionDesc origFunc = sqlDigest.aggregations.iterator().next();
+        if (origFunc.isSum() == false) {
             throw new IllegalStateException("When query with topN, only SUM function is allowed.");
         }
 
-        FunctionDesc rewriteFunction = null;
-        // replace the SUM to the TopN function
-        for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
-            if (measureDesc.getFunction().isCompatible(functionDesc) && topNDisplayCol.getName().equalsIgnoreCase(measureDesc.getFunction().getParameter().getDisplayColumn())) {
-                rewriteFunction = measureDesc.getFunction();
-                break;
-            }
-        }
-
-        if (rewriteFunction == null) {
-            throw new IllegalStateException("Didn't find topN measure for " + functionDesc);
-        }
-
-        sqlDigest.aggregations = Lists.newArrayList(rewriteFunction);
-        logger.info("Rewrite function " + functionDesc + " to " + rewriteFunction);
+        sqlDigest.aggregations = Lists.newArrayList(topnFunc);
+        sqlDigest.groupbyColumns.remove(topnLiteralCol);
+        sqlDigest.metricColumns.add(topnLiteralCol);
+        logger.info("Rewrite function " + origFunc + " to " + topnFunc);
     }
-
-    private TblColRef extractTopNCol(Collection<TblColRef> colRefs) {
-        for (TblColRef colRef : colRefs) {
-            if (topNColumns.contains(colRef)) {
-                return colRef;
-            }
-        }
-
-        return null;
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/e62e0b3e/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
index 831cadb..0983689 100644
--- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/cube/v1/SerializedHBaseTupleIterator.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.filter.TupleFilter;
+import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 import org.apache.kylin.metadata.tuple.ITupleIterator;
@@ -58,7 +59,7 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
     private ITuple next;
 
     public SerializedHBaseTupleIterator(HConnection conn, List<HBaseKeyRange> segmentKeyRanges, CubeInstance cube, //
-            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, TblColRef topNCol, List<RowValueDecoder> rowValueDecoders, //
+            Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, //
             StorageContext context, TupleInfo returnTupleInfo) {
 
         this.context = context;
@@ -67,14 +68,9 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
 
         this.segmentIteratorList = new ArrayList<CubeSegmentTupleIterator>(segmentKeyRanges.size());
         Map<CubeSegment, List<HBaseKeyRange>> rangesMap = makeRangesMap(segmentKeyRanges);
-        boolean useTopN = topNCol != null;
+
         for (Map.Entry<CubeSegment, List<HBaseKeyRange>> entry : rangesMap.entrySet()) {
-            CubeSegmentTupleIterator segIter;
-            if (useTopN)
-                segIter = new CubeSegmentTopNTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
-            else
-                segIter = new CubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
-            this.segmentIteratorList.add(segIter);
+            this.segmentIteratorList.add(newCubeSegmentTupleIterator(entry.getKey(), entry.getValue(), conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo));
         }
 
         this.segmentIteratorIterator = this.segmentIteratorList.iterator();
@@ -85,6 +81,16 @@ public class SerializedHBaseTupleIterator implements ITupleIterator {
         }
     }
 
+    private CubeSegmentTupleIterator newCubeSegmentTupleIterator(CubeSegment seg, List<HBaseKeyRange> keyRange, HConnection conn, Set<TblColRef> dimensions, TupleFilter filter, Set<TblColRef> groupBy, List<RowValueDecoder> rowValueDecoders, StorageContext context2, TupleInfo returnTupleInfo) {
+        MeasureDesc topN = RowValueDecoder.findTopN(rowValueDecoders);
+        if (topN != null) {
+            TblColRef topNCol = topN.getFunction().getTopNLiteralColumn();
+            return new CubeSegmentTopNTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, topNCol, rowValueDecoders, context, returnTupleInfo);
+        } else {
+            return new CubeSegmentTupleIterator(seg, keyRange, conn, dimensions, filter, groupBy, rowValueDecoders, context, returnTupleInfo);
+        }
+    }
+
     private Map<CubeSegment, List<HBaseKeyRange>> makeRangesMap(List<HBaseKeyRange> segmentKeyRanges) {
         Map<CubeSegment, List<HBaseKeyRange>> map = Maps.newHashMap();
         for (HBaseKeyRange range : segmentKeyRanges) {