You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/06/21 13:57:03 UTC

incubator-kylin git commit: KYLIN-803 Refactor InMemCubeBuilder, reuse dependent measure in MeasureAggregator

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 ed87c6f37 -> bb8d6d603


KYLIN-803 Refactor InMemCubeBuilder, reuse dependent measure in MeasureAggregator


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/bb8d6d60
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/bb8d6d60
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/bb8d6d60

Branch: refs/heads/0.8
Commit: bb8d6d603bac86e0cf6dfba1ee68268d4fcc743b
Parents: ed87c6f
Author: Yang Li <li...@apache.org>
Authored: Sun Jun 21 19:56:33 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Sun Jun 21 19:56:33 2015 +0800

----------------------------------------------------------------------
 .../kylin/common/util/ImmutableBitSet.java      | 11 ++-
 .../kylin/job/inmemcubing/InMemCubeBuilder.java | 84 +++++++-------------
 .../kylin/storage/cube/CubeCodeSystem.java      | 38 +++++++--
 .../kylin/storage/cube/CubeGridTable.java       |  2 +-
 .../storage/cube/CuboidToGridTableMapping.java  | 47 ++++++++---
 .../storage/gridtable/GTAggregateScanner.java   |  8 +-
 .../apache/kylin/storage/gridtable/GTInfo.java  | 22 ++---
 .../storage/gridtable/GTSampleCodeSystem.java   | 12 ++-
 .../kylin/storage/gridtable/IGTCodeSystem.java  |  5 +-
 9 files changed, 133 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
----------------------------------------------------------------------
diff --git a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
index f13cd1d..c401425 100644
--- a/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
+++ b/common/src/main/java/org/apache/kylin/common/util/ImmutableBitSet.java
@@ -44,10 +44,19 @@ public class ImmutableBitSet {
         return arr.length;
     }
 
-    /** return the i-th true bit index */
+    /** return the i-th true bit */
     public int trueBitAt(int i) {
         return arr[i];
     }
+    
+    /** return the bit's index among true bits */
+    public int trueBitIndexOf(int bitIndex) {
+        for (int i = 0; i < arr.length; i++) {
+            if (arr[i] == bitIndex)
+                return i;
+        }
+        return -1;
+    }
 
     public BitSet mutable() {
         return (BitSet) set.clone();

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index d9a0b8f..53d7486 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,13 +16,28 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.io.DoubleWritable;
 import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-import org.apache.kylin.common.util.*;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -35,18 +50,19 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
+import org.apache.kylin.storage.gridtable.IGTStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.*;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
@@ -62,7 +78,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureCodec measureCodec;
     private final String[] metricsAggrFuncs;
-    private final Map<Integer, Integer> dependentMeasures; // key: index of Measure which depends on another measure; value: index of Measure which is depended on;
     private final int[] hbaseMeasureRefIndex;
     private final MeasureDesc[] measureDescs;
     private final int measureCount;
@@ -113,16 +128,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             measureIndexMap.put(measureDesc.getName(), i);
         }
         this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
-
-        this.dependentMeasures = Maps.newHashMap();
-        for (int i = 0; i < measureCount; i++) {
-            String depMsrRef = measureDescsList.get(i).getDependentMeasureRef();
-            if (depMsrRef != null) {
-                int index = measureIndexMap.get(depMsrRef);
-                dependentMeasures.put(i, index);
-            }
-        }
-
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
     }
 
@@ -447,44 +452,13 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
         GTRecord newRecord = new GTRecord(newGridTable.getInfo());
         int count = 0;
-        ByteArray byteArray = new ByteArray(8);
-        ByteBuffer byteBuffer = ByteBuffer.allocate(8);
         try {
-            BitSet dependentMetricsBS = new BitSet(allNeededColumns.cardinality());
-            for (Integer i : dependentMeasures.keySet()) {
-                dependentMetricsBS.set((allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)));
-            }
-            ImmutableBitSet dependentMetrics = new ImmutableBitSet(dependentMetricsBS);
-
-            Object[] hllObjects = new Object[dependentMeasures.keySet().size()];
-
             for (GTRecord record : scanner) {
                 count++;
                 for (int i = 0; i < allNeededColumns.trueBitCount(); i++) {
                     int c = allNeededColumns.trueBitAt(i);
                     newRecord.set(i, record.get(c));
                 }
-
-                if (dependentMeasures.size() > 0) {
-                    // update measures which have 'dependent_measure_ref'
-                    newRecord.getValues(dependentMetrics, hllObjects);
-
-                    for (Integer i : dependentMeasures.keySet()) {
-                        for (int index = 0; index < dependentMetrics.trueBitCount(); index++) {
-                            int c = dependentMetrics.trueBitAt(index);
-                            if (c == allNeededColumns.cardinality() - measureCount + dependentMeasures.get(i)) {
-                                assert hllObjects[index] instanceof HyperLogLogPlusCounter; // currently only HLL is allowed
-
-                                byteBuffer.clear();
-                                BytesUtil.writeVLong(((HyperLogLogPlusCounter) hllObjects[index]).getCountEstimate(), byteBuffer);
-                                byteArray.set(byteBuffer.array(), 0, byteBuffer.position());
-                                newRecord.set(allNeededColumns.cardinality() - measureCount + i, byteArray);
-                            }
-                        }
-
-                    }
-                }
-
                 builder.write(newRecord);
             }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
index 519b8e2..dbfef87 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeCodeSystem.java
@@ -9,6 +9,7 @@ import java.util.Map;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.filter.IFilterCodeSystem;
@@ -31,18 +32,20 @@ public class CubeCodeSystem implements IGTCodeSystem {
     // ============================================================================
 
     private GTInfo info;
-    private Map<Integer, Dictionary> dictionaryMap = null; // column index ==> dictionary of column
-    private Map<Integer, Integer> fixLenMap = null; // column index ==> fixed length of column
+    private Map<Integer, Dictionary> dictionaryMap; // column index ==> dictionary of column
+    private Map<Integer, Integer> fixLenMap; // column index ==> fixed length of column
+    private Map<Integer, Integer> dependentMetricsMap;
     private IFilterCodeSystem<ByteArray> filterCS;
     private DataTypeSerializer[] serializers;
 
     public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap) {
-        this(dictionaryMap, Collections.<Integer, Integer>emptyMap());
+        this(dictionaryMap, Collections.<Integer, Integer>emptyMap(), Collections.<Integer, Integer>emptyMap());
     }
             
-    public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap, Map<Integer, Integer> fixLenMap) {
+    public CubeCodeSystem(Map<Integer, Dictionary> dictionaryMap, Map<Integer, Integer> fixLenMap, Map<Integer, Integer> dependentMetricsMap) {
         this.dictionaryMap = dictionaryMap;
         this.fixLenMap = fixLenMap;
+        this.dependentMetricsMap = dependentMetricsMap;
     }
 
     @Override
@@ -150,8 +153,31 @@ public class CubeCodeSystem implements IGTCodeSystem {
     }
 
     @Override
-    public MeasureAggregator<?> newMetricsAggregator(String aggrFunction, int col) {
-        return MeasureAggregator.create(aggrFunction, info.getColumnType(col).toString());
+    public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
+        assert columns.trueBitCount() == aggrFunctions.length;
+        
+        MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
+        for (int i = 0; i < result.length; i++) {
+            int col = columns.trueBitAt(i);
+            result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
+        }
+        
+        // deal with holistic distinct count
+        if (dependentMetricsMap != null) {
+            for (Integer child : dependentMetricsMap.keySet()) {
+                if (columns.get(child)) {
+                    Integer parent = dependentMetricsMap.get(child);
+                    if (columns.get(parent) == false)
+                        throw new IllegalStateException();
+                    
+                    int childIdx = columns.trueBitIndexOf(child);
+                    int parentIdx = columns.trueBitIndexOf(parent);
+                    result[childIdx].setDependentAggregator(result[parentIdx]);
+                }
+            }
+        }
+        
+        return result;
     }
 
     static class DictionarySerializer extends DataTypeSerializer {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
index 89bde96..38ac965 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CubeGridTable.java
@@ -63,7 +63,7 @@ public class CubeGridTable {
 
         GTInfo.Builder builder = GTInfo.builder();
         builder.setTableName("Cuboid " + cuboidId);
-        builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx));
+        builder.setCodeSystem(new CubeCodeSystem(dictionaryByColIdx, fixLenByColIdx, mapping.getDependentMetricsMap()));
         builder.setColumns(mapping.getDataTypes());
         builder.setPrimaryKey(mapping.getPrimaryKey());
         builder.enableColumnBlock(mapping.getColumnBlocks());

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
index 4388eac..82abf57 100644
--- a/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
+++ b/storage/src/main/java/org/apache/kylin/storage/cube/CuboidToGridTableMapping.java
@@ -1,6 +1,7 @@
 package org.apache.kylin.storage.cube;
 
 import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
@@ -21,7 +22,7 @@ import com.google.common.collect.Maps;
 public class CuboidToGridTableMapping {
 
     final private Cuboid cuboid;
-    
+
     private List<DataType> gtDataTypes;
     private List<ImmutableBitSet> gtColBlocks;
 
@@ -63,15 +64,17 @@ public class CuboidToGridTableMapping {
             for (HBaseColumnDesc hbaseColDesc : familyDesc.getColumns()) {
                 BitSet colBlock = new BitSet();
                 for (MeasureDesc measure : hbaseColDesc.getMeasures()) {
-                    // count distinct & holistic count distinct are equals() but different
-                    // assert the holistic version if exists always comes earlier
+                    // Count distinct & holistic count distinct are equals() but different.
+                    // Ensure the holistic version if exists is always the first.
                     FunctionDesc func = measure.getFunction();
                     if (func.isHolisticCountDistinct()) {
-                        if (metrics2gt.get(func).size() > 0 )
-                            throw new IllegalStateException();
+                        List<Integer> existing = metrics2gt.removeAll(func);
+                        metrics2gt.put(func, gtColIdx);
+                        metrics2gt.putAll(func, existing);
+                    } else {
+                        metrics2gt.put(func, gtColIdx);
                     }
                     gtDataTypes.add(func.getReturnDataType());
-                    metrics2gt.put(func, gtColIdx);
                     colBlock.set(gtColIdx);
                     gtColIdx++;
                 }
@@ -81,19 +84,19 @@ public class CuboidToGridTableMapping {
         nMetrics = gtColIdx - nDimensions;
         assert nMetrics == cuboid.getCube().getMeasures().size();
     }
-    
+
     public int getColumnCount() {
         return nDimensions + nMetrics;
     }
-    
+
     public int getDimensionCount() {
         return nDimensions;
     }
-    
+
     public int getMetricsCount() {
         return nMetrics;
     }
-    
+
     public DataType[] getDataTypes() {
         return (DataType[]) gtDataTypes.toArray(new DataType[gtDataTypes.size()]);
     }
@@ -126,8 +129,30 @@ public class CuboidToGridTableMapping {
         else
             return -1;
     }
-
+    
     public List<TblColRef> getCuboidDimensionsInGTOrder() {
         return cuboid.getColumns();
     }
+    
+    public Map<Integer, Integer> getDependentMetricsMap() {
+        Map<Integer, Integer> result = Maps.newHashMap();
+        List<MeasureDesc> measures = cuboid.getCube().getMeasures();
+        for (MeasureDesc child : measures) {
+            if (child.getDependentMeasureRef() != null) {
+                boolean ok = false;
+                for (MeasureDesc parent : measures) {
+                    if (parent.getName().equals(child.getDependentMeasureRef())) {
+                        int childIndex = getIndexOf(child.getFunction());
+                        int parentIndex = getIndexOf(parent.getFunction());
+                        result.put(childIndex, parentIndex);
+                        ok = true;
+                        break;
+                    }
+                }
+                if (!ok)
+                    throw new IllegalStateException("Cannot find dependent measure: " + child.getDependentMeasureRef());
+            }
+        }
+        return result.isEmpty() ? Collections.<Integer, Integer>emptyMap() : result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
index 6005d2d..25b217c 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTAggregateScanner.java
@@ -162,13 +162,7 @@ public class GTAggregateScanner implements IGTScanner {
         }
 
         private MeasureAggregator[] newAggregators() {
-            MeasureAggregator[] aggrs;
-            aggrs = new MeasureAggregator[metricsAggrFuncs.length];
-            for (int i = 0; i < aggrs.length; i++) {
-                int col = metrics.trueBitAt(i);
-                aggrs[i] = info.codeSystem.newMetricsAggregator(metricsAggrFuncs[i], col);
-            }
-            return aggrs;
+            return info.codeSystem.newMetricsAggregators(metrics, metricsAggrFuncs);
         }
 
         public Object[] calculateTotalSumSanityCheck() {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
index f552e19..1c69f8e 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTInfo.java
@@ -31,17 +31,17 @@ public class GTInfo {
     ImmutableBitSet colBlocksAll;
     int rowBlockSize; // 0: disable row block
 
-    // sharding & rowkey
+    // sharding
     int nShards; // 0: no sharding
 
     // must create from builder
     private GTInfo() {
     }
-    
+
     public String getTableName() {
         return tableName;
     }
-    
+
     public IGTCodeSystem getCodeSystem() {
         return codeSystem;
     }
@@ -49,7 +49,7 @@ public class GTInfo {
     public int getColumnCount() {
         return nColumns;
     }
-    
+
     public DataType getColumnType(int i) {
         return colTypes[i];
     }
@@ -57,7 +57,7 @@ public class GTInfo {
     public ImmutableBitSet getPrimaryKey() {
         return primaryKey;
     }
-    
+
     public boolean isShardingEnabled() {
         return nShards > 0;
     }
@@ -69,11 +69,11 @@ public class GTInfo {
     public int getRowBlockSize() {
         return rowBlockSize;
     }
-    
+
     public int getMaxRecordLength() {
         return getMaxColumnLength(colAll);
     }
-    
+
     public int getMaxColumnLength(ImmutableBitSet selectedCols) {
         int result = 0;
         for (int i = 0; i < selectedCols.trueBitCount(); i++) {
@@ -82,7 +82,7 @@ public class GTInfo {
         }
         return result;
     }
-    
+
     public int getMaxColumnLength() {
         int max = 0;
         for (int i = 0; i < nColumns; i++)
@@ -135,13 +135,13 @@ public class GTInfo {
 
     private void validateColumnBlocks() {
         colAll = new ImmutableBitSet(0, nColumns);
-        
+
         if (colBlocks == null) {
             colBlocks = new ImmutableBitSet[2];
             colBlocks[0] = primaryKey;
             colBlocks[1] = colAll.andNot(primaryKey);
         }
-        
+
         colBlocksAll = new ImmutableBitSet(0, colBlocks.length);
 
         if (colPreferIndex == null)
@@ -190,7 +190,7 @@ public class GTInfo {
             info.tableName = name;
             return this;
         }
-        
+
         /** required */
         public Builder setCodeSystem(IGTCodeSystem cs) {
             info.codeSystem = cs;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
index ceeeb84..7d6a1cc 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/GTSampleCodeSystem.java
@@ -4,6 +4,7 @@ import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.metadata.filter.IFilterCodeSystem;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.serializer.DataTypeSerializer;
@@ -85,8 +86,15 @@ public class GTSampleCodeSystem implements IGTCodeSystem {
     // ============================================================================
 
     @Override
-    public MeasureAggregator<?> newMetricsAggregator(String aggrFunction, int col) {
-        return MeasureAggregator.create(aggrFunction, info.colTypes[col].toString());
+    public MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions) {
+        assert columns.trueBitCount() == aggrFunctions.length;
+        
+        MeasureAggregator<?>[] result = new MeasureAggregator[aggrFunctions.length];
+        for (int i = 0; i < result.length; i++) {
+            int col = columns.trueBitAt(i);
+            result[i] = MeasureAggregator.create(aggrFunctions[i], info.getColumnType(col).toString());
+        }
+        return result;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/bb8d6d60/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
index 5be43fb..368cc32 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/IGTCodeSystem.java
@@ -3,6 +3,7 @@ package org.apache.kylin.storage.gridtable;
 import java.nio.ByteBuffer;
 
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.metadata.filter.IFilterCodeSystem;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 
@@ -38,7 +39,7 @@ public interface IGTCodeSystem {
     /** Decode a code into value */
     Object decodeColumnValue(int col, ByteBuffer buf);
     
-    /** Return an aggregator for metrics */
-    MeasureAggregator<?> newMetricsAggregator(String aggrFunction, int col);
+    /** Return aggregators for metrics */
+    MeasureAggregator<?>[] newMetricsAggregators(ImmutableBitSet columns, String[] aggrFunctions);
     
 }