You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/11/27 10:01:06 UTC

[11/13] incubator-kylin git commit: KYLIN-976 Add ingester; Build part done, in-mem cube test pass

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
deleted file mode 100644
index 7430c4e..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DoubleMinAggregator extends MeasureAggregator<DoubleMutable> {
-
-    DoubleMutable min = null;
-
-    @Override
-    public void reset() {
-        min = null;
-    }
-
-    @Override
-    public void aggregate(DoubleMutable value) {
-        if (min == null)
-            min = new DoubleMutable(value.get());
-        else if (min.get() > value.get())
-            min.set(value.get());
-    }
-
-    @Override
-    public DoubleMutable getState() {
-        return min;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessDoubleMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
deleted file mode 100644
index 6e66c1b..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/DoubleSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DoubleMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class DoubleSumAggregator extends MeasureAggregator<DoubleMutable> {
-
-    DoubleMutable sum = new DoubleMutable();
-
-    @Override
-    public void reset() {
-        sum.set(0.0);
-    }
-
-    @Override
-    public void aggregate(DoubleMutable value) {
-        sum.set(sum.get() + value.get());
-    }
-
-    @Override
-    public DoubleMutable getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessDoubleMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
deleted file mode 100644
index 7fdf3d8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMaxAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongMaxAggregator extends MeasureAggregator<LongMutable> {
-
-    LongMutable max = null;
-
-    @Override
-    public void reset() {
-        max = null;
-    }
-
-    @Override
-    public void aggregate(LongMutable value) {
-        if (max == null)
-            max = new LongMutable(value.get());
-        else if (max.get() < value.get())
-            max.set(value.get());
-    }
-
-    @Override
-    public LongMutable getState() {
-        return max;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
deleted file mode 100644
index 22ae865..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongMinAggregator.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongMinAggregator extends MeasureAggregator<LongMutable> {
-
-    LongMutable min = null;
-
-    @Override
-    public void reset() {
-        min = null;
-    }
-
-    @Override
-    public void aggregate(LongMutable value) {
-        if (min == null)
-            min = new LongMutable(value.get());
-        else if (min.get() > value.get())
-            min.set(value.get());
-    }
-
-    @Override
-    public LongMutable getState() {
-        return min;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
deleted file mode 100644
index 38d728a..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/basic/LongSumAggregator.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.basic;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- */
-@SuppressWarnings("serial")
-public class LongSumAggregator extends MeasureAggregator<LongMutable> {
-
-    LongMutable sum = new LongMutable();
-
-    @Override
-    public void reset() {
-        sum.set(0);
-    }
-
-    @Override
-    public void aggregate(LongMutable value) {
-        sum.set(sum.get() + value.get());
-    }
-
-    @Override
-    public LongMutable getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
deleted file mode 100644
index d5ceba5..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregation.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import java.util.List;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class HLLCAggregation extends AggregationType {
-    
-    private final DataType dataType;
-    
-    public HLLCAggregation(String dataType) {
-        this.dataType = DataType.getType(dataType);
-        
-        if (this.dataType.getPrecision() < 10 || this.dataType.getPrecision() > 16)
-            throw new IllegalArgumentException("HLLC precision must be between 10 and 16");
-    }
-
-    @Override
-    public DataType getAggregationDataType() {
-        return dataType;
-    }
-
-    @Override
-    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
-        return HLLCSerializer.class;
-    }
-    
-    @Override
-    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public MeasureAggregator<?> newAggregator() {
-      if (dataType.isHLLC())
-          return new HLLCAggregator(dataType.getPrecision());
-      else
-          return new LDCAggregator();
-    }
-
-    @Override
-    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
deleted file mode 100644
index 18c021d..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregationFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.IAggregationFactory;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class HLLCAggregationFactory implements IAggregationFactory {
-
-    @Override
-    public AggregationType createAggregationType(String funcName, String dataType) {
-        if (FunctionDesc.FUNC_COUNT_DISTINCT.equalsIgnoreCase(funcName) == false)
-            throw new IllegalArgumentException();
-        
-        return new HLLCAggregation(dataType);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
deleted file mode 100644
index 8f85fe8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCAggregator.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- */
-@SuppressWarnings("serial")
-public class HLLCAggregator extends MeasureAggregator<HyperLogLogPlusCounter> {
-
-    final int precision;
-    HyperLogLogPlusCounter sum = null;
-
-    public HLLCAggregator(int precision) {
-        this.precision = precision;
-    }
-
-    @Override
-    public void reset() {
-        sum = null;
-    }
-
-    @Override
-    public void aggregate(HyperLogLogPlusCounter value) {
-        if (sum == null)
-            sum = new HyperLogLogPlusCounter(value);
-        else
-            sum.merge(value);
-    }
-
-    @Override
-    public HyperLogLogPlusCounter getState() {
-        return sum;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        // 1024 + 60 returned by AggregationCacheMemSizeTest
-        return 8 // aggregator obj shell
-                + 4 // precision
-                + 8 // ref to HLLC
-                + 8 // HLLC obj shell
-                + 32 + (1 << precision); // HLLC internal
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
deleted file mode 100644
index 5612892..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/HLLCSerializer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
-
-/**
- * @author yangli9
- * 
- */
-public class HLLCSerializer extends DataTypeSerializer<HyperLogLogPlusCounter> {
-
-    // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<HyperLogLogPlusCounter> current = new ThreadLocal<HyperLogLogPlusCounter>();
-
-    private int precision;
-
-    public HLLCSerializer(DataType type) {
-        this.precision = type.getPrecision();
-    }
-
-    @Override
-    public void serialize(HyperLogLogPlusCounter value, ByteBuffer out) {
-        try {
-            value.writeRegisters(out);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private HyperLogLogPlusCounter current() {
-        HyperLogLogPlusCounter hllc = current.get();
-        if (hllc == null) {
-            hllc = new HyperLogLogPlusCounter(precision);
-            current.set(hllc);
-        }
-        return hllc;
-    }
-
-    @Override
-    public HyperLogLogPlusCounter deserialize(ByteBuffer in) {
-        HyperLogLogPlusCounter hllc = current();
-        try {
-            hllc.readRegisters(in);
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-        return hllc;
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        return current().peekLength(in);
-    }
-
-    @Override
-    public int maxLength() {
-        return current().maxLength();
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return current().maxLength();
-    }
-
-    @Override
-    public HyperLogLogPlusCounter valueOf(byte[] value) {
-        HyperLogLogPlusCounter hllc = current();
-        hllc.clear();
-        if (value == null)
-            hllc.add("__nUlL__");
-        else
-            hllc.add(value);
-        return hllc;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
deleted file mode 100644
index 151c1ee..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/hllc/LDCAggregator.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.hllc;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.LongMutable;
-
-/**
- * Long Distinct Count
- */
-@SuppressWarnings("serial")
-public class LDCAggregator extends MeasureAggregator<LongMutable> {
-
-    private static LongMutable ZERO = new LongMutable(0);
-
-    private HLLCAggregator hllAgg = null;
-    private LongMutable state = new LongMutable(0);
-
-    @SuppressWarnings("rawtypes")
-    public void setDependentAggregator(MeasureAggregator agg) {
-        this.hllAgg = (HLLCAggregator) agg;
-    }
-
-    @Override
-    public void reset() {
-    }
-
-    @Override
-    public void aggregate(LongMutable value) {
-    }
-
-    @Override
-    public LongMutable getState() {
-        if (hllAgg == null) {
-            return ZERO;
-        } else {
-            state.set(hllAgg.getState().getCountEstimate());
-            return state;
-        }
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return guessLongMemBytes();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
deleted file mode 100644
index 251abd9..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregation.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.util.List;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.aggregation.hllc.HLLCSerializer;
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.dict.Dictionary;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.apache.kylin.metadata.model.TblColRef;
-
-public class TopNAggregation extends AggregationType {
-
-    private final DataType dataType;
-
-    public TopNAggregation(String dataType) {
-        this.dataType = DataType.getType(dataType);
-        
-        if (this.dataType.getPrecision() < 1 || this.dataType.getPrecision() > 1000)
-            throw new IllegalArgumentException("TopN precision must be between 1 and 1000");
-    }
-
-    @Override
-    public DataType getAggregationDataType() {
-        return dataType;
-    }
-
-    @Override
-    public Class<? extends DataTypeSerializer<?>> getAggregationDataSeralizer() {
-        return HLLCSerializer.class;
-    }
-    
-    @Override
-    public void validate(MeasureDesc measureDesc) throws IllegalArgumentException {
-        // TODO Auto-generated method stub
-        
-    }
-
-    @Override
-    public MeasureAggregator<?> newAggregator() {
-        return new TopNAggregator();
-    }
-
-    @Override
-    public List<TblColRef> getColumnsNeedDictionary(MeasureDesc measureDesc) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-    @Override
-    public Object reEncodeDictionary(Object value, List<Dictionary<?>> oldDicts, List<Dictionary<?>> newDicts) {
-        // TODO Auto-generated method stub
-        return null;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
deleted file mode 100644
index 1ea22c8..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregationFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import org.apache.kylin.aggregation.AggregationType;
-import org.apache.kylin.aggregation.IAggregationFactory;
-import org.apache.kylin.metadata.model.FunctionDesc;
-
-public class TopNAggregationFactory implements IAggregationFactory {
-
-    @Override
-    public AggregationType createAggregationType(String funcName, String dataType) {
-        if (FunctionDesc.FUNC_TOP_N.equalsIgnoreCase(funcName) == false)
-            throw new IllegalArgumentException();
-        
-        return new TopNAggregation(dataType);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
deleted file mode 100644
index 4f6c7ee..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNAggregator.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.util.Map;
-
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-
-import com.google.common.collect.Maps;
-
-/**
- * 
- */
-@SuppressWarnings("serial")
-public class TopNAggregator extends MeasureAggregator<TopNCounter<ByteArray>> {
-
-    int capacity = 0;
-    TopNCounter<ByteArray> sum = null;
-    Map<ByteArray, Double> sanityCheckMap;
-
-    @Override
-    public void reset() {
-        sum = null;
-    }
-
-    @Override
-    public void aggregate(TopNCounter<ByteArray> value) {
-        if (sum == null) {
-            capacity = value.getCapacity();
-            sum = new TopNCounter<>(capacity);
-            sanityCheckMap = Maps.newHashMap();
-        }
-        sum.merge(value);
-    }
-
-    @Override
-    public TopNCounter<ByteArray> getState() {
-        
-        //sum.retain(capacity);
-        return sum;
-    }
-
-    @Override
-    public int getMemBytesEstimate() {
-        return 8 * capacity / 4;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java b/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
deleted file mode 100644
index 8088842..0000000
--- a/core-cube/src/main/java/org/apache/kylin/aggregation/topn/TopNCounterSerializer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.aggregation.topn;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.topn.Counter;
-import org.apache.kylin.common.topn.DoubleDeltaSerializer;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.BytesUtil;
-
-/**
- * 
- */
-public class TopNCounterSerializer extends DataTypeSerializer<TopNCounter<ByteArray>> {
-
-    private DoubleDeltaSerializer dds = new DoubleDeltaSerializer(3);
-
-    private int precision;
-
-    public TopNCounterSerializer(DataType dataType) {
-        this.precision = dataType.getPrecision();
-    }
-
-    @Override
-    public int peekLength(ByteBuffer in) {
-        int mark = in.position();
-        @SuppressWarnings("unused")
-        int capacity = in.getInt();
-        int size = in.getInt();
-        int keyLength = in.getInt();
-        dds.deserialize(in);
-        int len = in.position() - mark + keyLength * size;
-        in.position(mark);
-        return len;
-    }
-
-    @Override
-    public int maxLength() {
-        return precision * TopNCounter.EXTRA_SPACE_RATE * (4 + 8);
-    }
-
-    @Override
-    public int getStorageBytesEstimate() {
-        return precision * TopNCounter.EXTRA_SPACE_RATE * 8;
-    }
-
-    @Override
-    public TopNCounter<ByteArray> valueOf(byte[] value) {
-        ByteBuffer buffer = ByteBuffer.wrap(value);
-        int sizeOfId = buffer.getInt();
-        int keyEncodedValue = buffer.getInt();
-        double counter = buffer.getDouble();
-
-        ByteArray key = new ByteArray(sizeOfId);
-        BytesUtil.writeUnsigned(keyEncodedValue, key.array(), 0, sizeOfId);
-
-        TopNCounter<ByteArray> topNCounter = new TopNCounter<ByteArray>(precision * TopNCounter.EXTRA_SPACE_RATE);
-        topNCounter.offer(key, counter);
-        return topNCounter;
-    }
-
-    @Override
-    public void serialize(TopNCounter<ByteArray> value, ByteBuffer out) {
-        double[] counters = value.getCounters();
-        List<ByteArray> peek = value.peek(1);
-        int keyLength = peek.size() > 0 ? peek.get(0).length() : 0;
-        out.putInt(value.getCapacity());
-        out.putInt(value.size());
-        out.putInt(keyLength);
-        dds.serialize(counters, out);
-        Iterator<Counter<ByteArray>> iterator = value.iterator();
-        while (iterator.hasNext()) {
-            out.put(iterator.next().getItem().array());
-        }
-    }
-
-    @Override
-    public TopNCounter<ByteArray> deserialize(ByteBuffer in) {
-        int capacity = in.getInt();
-        int size = in.getInt();
-        int keyLength = in.getInt();
-        double[] counters = dds.deserialize(in);
-
-        TopNCounter<ByteArray> counter = new TopNCounter<ByteArray>(capacity);
-        ByteArray byteArray;
-        for (int i = 0; i < size; i++) {
-            byteArray = new ByteArray(keyLength);
-            in.get(byteArray.array());
-            counter.offerToHead(byteArray, counters[i]);
-        }
-
-        return counter;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
index 4592b15..d909a4c 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java
@@ -38,10 +38,10 @@ import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.Serializer;
 import org.apache.kylin.common.restclient.Broadcaster;
 import org.apache.kylin.common.restclient.CaseInsensitiveStringCache;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.DimensionDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.dict.DistinctColumnValuesProvider;
@@ -182,7 +182,8 @@ public class CubeManager implements IRealizationProvider {
     /**
      * return null if no dictionary for given column
      */
-    public Dictionary<?> getDictionary(CubeSegment cubeSeg, TblColRef col) {
+    @SuppressWarnings("unchecked")
+    public Dictionary<String> getDictionary(CubeSegment cubeSeg, TblColRef col) {
         DictionaryInfo info = null;
         try {
             DictionaryManager dictMgr = getDictionaryManager();
@@ -199,7 +200,7 @@ public class CubeManager implements IRealizationProvider {
             throw new IllegalStateException("Failed to get dictionary for cube segment" + cubeSeg + ", col" + col, e);
         }
 
-        return info.getDictionaryObject();
+        return (Dictionary<String>) info.getDictionaryObject();
     }
 
     public SnapshotTable buildSnapshotTable(CubeSegment cubeSeg, String lookupTable) throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
index 62df1e9..b29f83a 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java
@@ -25,17 +25,17 @@ import java.util.TimeZone;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ShardingHash;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
-import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.IStorageAware;
 import org.apache.kylin.metadata.model.SegmentStatusEnum;
 import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.realization.IRealization;
+import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonAutoDetect.Visibility;
@@ -43,8 +43,6 @@ import com.fasterxml.jackson.annotation.JsonBackReference;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Objects;
 import com.google.common.collect.Maps;
-import org.apache.kylin.metadata.realization.IRealization;
-import org.apache.kylin.metadata.realization.IRealizationSegment;
 
 @JsonAutoDetect(fieldVisibility = Visibility.NONE, getterVisibility = Visibility.NONE, isGetterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE)
 public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, IRealizationSegment {
@@ -267,6 +265,14 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     public void setStorageLocationIdentifier(String storageLocationIdentifier) {
         this.storageLocationIdentifier = storageLocationIdentifier;
     }
+    
+    public Map<TblColRef, Dictionary<String>> buildDictionaryMap() {
+        Map<TblColRef, Dictionary<String>> result = Maps.newHashMap();
+        for (TblColRef col : getCubeDesc().getAllColumnsNeedDictionary()) {
+            result.put(col, (Dictionary<String>) getDictionary(col));
+        }
+        return result;
+    }
 
     @Override
     public int getColumnLength(TblColRef col) {
@@ -279,7 +285,7 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     }
 
     @Override
-    public Dictionary<?> getDictionary(TblColRef col) {
+    public Dictionary<String> getDictionary(TblColRef col) {
         return CubeManager.getInstance(this.getCubeInstance().getConfig()).getDictionary(this, col);
     }
 
@@ -427,4 +433,5 @@ public class CubeSegment implements Comparable<CubeSegment>, IDictionaryAware, I
     public IJoinedFlatTableDesc getJoinedFlatTableDesc() {
         return new CubeJoinedFlatTableDesc(this.getCubeDesc(), this);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
index 3619d69..7f38c26 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeCodeSystem.java
@@ -1,23 +1,21 @@
 package org.apache.kylin.cube.gridtable;
 
-import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
-import org.apache.kylin.common.datatype.StringSerializer;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.DefaultGTComparator;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.IGTCodeSystem;
 import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 /**
  * defines how column values will be encoded to/ decoded from GTRecord 
@@ -111,9 +109,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
         if (serializer instanceof DictionarySerializer) {
             ((DictionarySerializer) serializer).serializeWithRounding(value, roundingFlag, buf);
         } else {
-            if ((value instanceof String) && (!(serializer instanceof StringSerializer || serializer instanceof FixLenSerializer))) {
-                value = serializer.valueOf((String) value);
-            }
             serializer.serialize(value, buf);
         }
     }
@@ -176,11 +171,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
         }
 
         @Override
-        public Object valueOf(byte[] value) {
-            throw new UnsupportedOperationException();
-        }
-
-        @Override
         public void serialize(Object value, ByteBuffer out) {
             throw new UnsupportedOperationException();
         }
@@ -230,10 +220,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
             return dictionary.getSizeOfId();
         }
 
-        @Override
-        public Object valueOf(byte[] value) {
-            throw new UnsupportedOperationException();
-        }
     }
 
     static class FixLenSerializer extends DataTypeSerializer {
@@ -306,16 +292,6 @@ public class CubeCodeSystem implements IGTCodeSystem {
             return fixLen;
         }
 
-        @Override
-        public Object valueOf(byte[] value) {
-            try {
-                return new String(value, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                // does not happen
-                throw new RuntimeException(e);
-            }
-        }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
index aa0a530..05fc8a5 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CubeGridTable.java
@@ -3,11 +3,11 @@ package org.apache.kylin.cube.gridtable;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.metadata.model.TblColRef;
 
@@ -16,24 +16,25 @@ import com.google.common.collect.Maps;
 @SuppressWarnings("rawtypes")
 public class CubeGridTable {
 
-    public static Map<TblColRef, Dictionary<?>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
+    public static Map<TblColRef, Dictionary<String>> getDimensionToDictionaryMap(CubeSegment cubeSeg, long cuboidId) {
         CubeDesc cubeDesc = cubeSeg.getCubeDesc();
         CubeManager cubeMgr = CubeManager.getInstance(cubeSeg.getCubeInstance().getConfig());
 
         // build a dictionary map
-        Map<TblColRef, Dictionary<?>> dictionaryMap = Maps.newHashMap();
+        Map<TblColRef, Dictionary<String>> dictionaryMap = Maps.newHashMap();
         List<TblColRef> dimCols = Cuboid.findById(cubeDesc, cuboidId).getColumns();
         for (TblColRef col : dimCols) {
-            Dictionary<?> dictionary = cubeMgr.getDictionary(cubeSeg, col);
+            Dictionary<String> dictionary = cubeMgr.getDictionary(cubeSeg, col);
             if (dictionary != null) {
                 dictionaryMap.put(col, dictionary);
             }
         }
+        
         return dictionaryMap;
     }
 
     public static GTInfo newGTInfo(CubeSegment cubeSeg, long cuboidId) throws NotEnoughGTInfoException {
-        Map<TblColRef, Dictionary<?>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
+        Map<TblColRef, Dictionary<String>> dictionaryMap = getDimensionToDictionaryMap(cubeSeg, cuboidId);
         Cuboid cuboid = Cuboid.findById(cubeSeg.getCubeDesc(), cuboidId);
         for (TblColRef dim : cuboid.getColumns()) {
             if (cubeSeg.getCubeDesc().getRowkey().isUseDictionary(dim)) {
@@ -47,7 +48,7 @@ public class CubeGridTable {
         return newGTInfo(cubeSeg.getCubeDesc(), cuboidId, dictionaryMap);
     }
 
-    public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+    public static GTInfo newGTInfo(CubeDesc cubeDesc, long cuboidId, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         Cuboid cuboid = Cuboid.findById(cubeDesc, cuboidId);
         CuboidToGridTableMapping mapping = new CuboidToGridTableMapping(cuboid);
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
index 36db773..2152301 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/CuboidToGridTableMapping.java
@@ -6,11 +6,11 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
index 045b11e..9bbcf75 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/NotEnoughGTInfoException.java
@@ -18,5 +18,6 @@
 
 package org.apache.kylin.cube.gridtable;
 
+@SuppressWarnings("serial")
 public class NotEnoughGTInfoException extends Exception {
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
index d30186e..c4d0a7e 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/gridtable/TrimmedCubeCodeSystem.java
@@ -23,12 +23,12 @@ package org.apache.kylin.cube.gridtable;
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.IGTCodeSystem;
 import org.apache.kylin.gridtable.IGTComparator;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class TrimmedCubeCodeSystem implements IGTCodeSystem {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
index 0a35559..58f94c1 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/AbstractInMemCubeBuilder.java
@@ -21,8 +21,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GridTable;
@@ -39,12 +39,12 @@ abstract public class AbstractInMemCubeBuilder {
     private static Logger logger = LoggerFactory.getLogger(AbstractInMemCubeBuilder.class);
 
     final protected CubeDesc cubeDesc;
-    final protected Map<TblColRef, Dictionary<?>> dictionaryMap;
+    final protected Map<TblColRef, Dictionary<String>> dictionaryMap;
 
     protected int taskThreadCount = 4;
     protected int reserveMemoryMB = 200;
 
-    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+    public AbstractInMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         if (cubeDesc == null)
             throw new NullPointerException();
         if (dictionaryMap == null)

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
index ce912a3..5b6131f 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilder.java
@@ -29,15 +29,15 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.kylin.aggregation.MeasureAggregators;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -54,7 +54,7 @@ public class DoggedCubeBuilder extends AbstractInMemCubeBuilder {
     private int splitRowThreshold = Integer.MAX_VALUE;
     private int unitRows = 1000;
 
-    public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+    public DoggedCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
index a393179..5c59de7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilder.java
@@ -28,9 +28,9 @@ import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.kylin.common.datatype.DoubleMutable;
 import org.apache.kylin.common.topn.Counter;
 import org.apache.kylin.common.topn.TopNCounter;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
 import org.apache.kylin.common.util.Pair;
@@ -38,8 +38,6 @@ import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.cuboid.CuboidScheduler;
 import org.apache.kylin.cube.gridtable.CubeGridTable;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTAggregateScanner;
 import org.apache.kylin.gridtable.GTBuilder;
 import org.apache.kylin.gridtable.GTInfo;
@@ -47,6 +45,7 @@ import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.gridtable.GTScanRequest;
 import org.apache.kylin.gridtable.GridTable;
 import org.apache.kylin.gridtable.IGTScanner;
+import org.apache.kylin.metadata.datatype.DoubleMutable;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -66,7 +65,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private final CuboidScheduler cuboidScheduler;
     private final long baseCuboidId;
     private final int totalCuboidCount;
-    private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final String[] metricsAggrFuncs;
     private final MeasureDesc[] measureDescs;
     private final int measureCount;
@@ -81,12 +79,11 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private Object[] totalSumForSanityCheck;
     private ICuboidCollector resultCollector;
 
-    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<?>> dictionaryMap) {
+    public InMemCubeBuilder(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap) {
         super(cubeDesc, dictionaryMap);
         this.cuboidScheduler = new CuboidScheduler(cubeDesc);
         this.baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
         this.totalCuboidCount = cuboidScheduler.getCuboidCount();
-        this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
 
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
@@ -100,8 +97,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         this.metricsAggrFuncs = metricsAggrFuncsList.toArray(new String[metricsAggrFuncsList.size()]);
     }
 
-    
-
     private GridTable newGridTableByCuboidID(long cuboidID) throws IOException {
         GTInfo info = CubeGridTable.newGTInfo(cubeDesc, cuboidID, dictionaryMap);
 
@@ -114,7 +109,6 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         return gridTable;
     }
 
-
     @Override
     public void build(BlockingQueue<List<String>> input, ICuboidWriter output) throws IOException {
         ConcurrentNavigableMap<Long, CuboidResult> result = build(input);
@@ -447,7 +441,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
         return updateCuboidResult(cuboidId, newGridTable, count, timeSpent, 0);
     }
 
-    //@SuppressWarnings("unused")
+    @SuppressWarnings({ "unused", "rawtypes", "unchecked" })
     private void sanityCheck(long parentId, long cuboidId, Object[] totalSum) {
         // double sum introduces error and causes result not exactly equal
         for (int i = 0; i < totalSum.length; i++) {
@@ -508,9 +502,7 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
             this.info = info;
             this.input = input;
             this.record = new GTRecord(info);
-            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, 
-                    InMemCubeBuilderUtils.createTopNLiteralColDictionaryMap(cubeDesc, intermediateTableDesc, dictionaryMap), 
-                    info);
+            this.inMemCubeBuilderInputConverter = new InMemCubeBuilderInputConverter(cubeDesc, dictionaryMap, info);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
index bf4278a..fed9479 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderInputConverter.java
@@ -17,49 +17,39 @@
 */
 package org.apache.kylin.cube.inmemcubing;
 
-import com.google.common.base.Preconditions;
+import java.util.List;
+import java.util.Map;
 
-import org.apache.kylin.aggregation.MeasureCodec;
-import org.apache.kylin.common.datatype.LongMutable;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTInfo;
 import org.apache.kylin.gridtable.GTRecord;
+import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
+import org.apache.kylin.metadata.model.ParameterDesc;
+import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  */
 public class InMemCubeBuilderInputConverter {
 
-    private static final LongMutable ONE = new LongMutable(1l);
-    
-    private final CubeDesc cubeDesc;
     private final CubeJoinedFlatTableDesc intermediateTableDesc;
     private final MeasureDesc[] measureDescs;
-    private final MeasureCodec measureCodec;
+    private final MeasureIngester<?>[] measureIngesters;
     private final int measureCount;
-    private final ByteBuffer valueBuf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-    private final Map<Integer, Dictionary<String>> topNLiteralColDictMap;
+    private final Map<TblColRef, Dictionary<String>> dictionaryMap;
     private final GTInfo gtInfo;
     
 
-    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<Integer, Dictionary<String>> topNLiteralColDictMap, GTInfo gtInfo) {
-        this.cubeDesc = cubeDesc;
+    public InMemCubeBuilderInputConverter(CubeDesc cubeDesc, Map<TblColRef, Dictionary<String>> dictionaryMap, GTInfo gtInfo) {
         this.gtInfo = gtInfo;
         this.intermediateTableDesc = new CubeJoinedFlatTableDesc(cubeDesc, null);
         this.measureCount = cubeDesc.getMeasures().size();
         this.measureDescs = cubeDesc.getMeasures().toArray(new MeasureDesc[measureCount]);
-        this.measureCodec = new MeasureCodec(cubeDesc.getMeasures());
-        this.topNLiteralColDictMap = Preconditions.checkNotNull(topNLiteralColDictMap, "topNLiteralColDictMap cannot be null");
+        this.measureIngesters = MeasureIngester.create(cubeDesc.getMeasures());
+        this.dictionaryMap = dictionaryMap;
     }
     
     public final GTRecord convert(List<String> row) {
@@ -89,59 +79,38 @@ public class InMemCubeBuilderInputConverter {
     }
 
     private Object[] buildValue(List<String> row) {
-
         Object[] values = new Object[measureCount];
         for (int i = 0; i < measureCount; i++) {
-            MeasureDesc measureDesc = measureDescs[i];
-            int[] flatTableIdx = intermediateTableDesc.getMeasureColumnIndexes()[i];
-            FunctionDesc function = cubeDesc.getMeasures().get(i).getFunction();
-            if (flatTableIdx == null) {
-                values[i] = measureCodec.getSerializer(i).valueOf(measureDesc.getFunction().getParameter().getValue());
-            } else if (function.isCount() || function.isHolisticCountDistinct()) {
+            values[i] = buildValueOf(i, row);
+        }
+        return values;
+    }
+    
+    private Object buildValueOf(int idxOfMeasure, List<String> row) {
+        MeasureDesc measure = measureDescs[idxOfMeasure];
+        FunctionDesc function = measure.getFunction();
+        int[] colIdxOnFlatTable = intermediateTableDesc.getMeasureColumnIndexes()[idxOfMeasure];
+        
+        int paramCount = function.getParameterCount();
+        String[] inputToMeasure = new String[paramCount];
+
+        // pick up parameter values
+        ParameterDesc param = function.getParameter();
+        int paramColIdx = 0; // index among parameters of column type
+        for (int i = 0; i < paramCount; i++, param = param.getNextParameter()) {
+            String value;
+            if (function.isCount() || function.isHolisticCountDistinct()) {
                 // note for holistic count distinct, this value will be ignored
-                values[i] = ONE;
-            } else if (function.isTopN()) {
-                // encode the key column with dict, and get the counter column;
-                int keyColIndex = flatTableIdx[flatTableIdx.length - 1];
-                Dictionary<String> literalColDict = topNLiteralColDictMap.get(keyColIndex);
-                int keyColEncoded = literalColDict.getIdFromValue(row.get(keyColIndex));
-                valueBuf.clear();
-                valueBuf.putInt(literalColDict.getSizeOfId());
-                valueBuf.putInt(keyColEncoded);
-                if (flatTableIdx.length == 1) {
-                    // only literalCol, use 1.0 as counter
-                    valueBuf.putDouble(1.0);
-                } else {
-                    // get the counter column value
-                    valueBuf.putDouble(Double.valueOf(row.get(flatTableIdx[0])));
-                }
-
-                values[i] = measureCodec.getSerializer(i).valueOf(valueBuf.array());
-
-            } else if (flatTableIdx.length == 1) {
-                values[i] = measureCodec.getSerializer(i).valueOf(toBytes(row.get(flatTableIdx[0])));
+                value = "1";
+            } else if (param.isColumnType()) {
+                value = row.get(colIdxOnFlatTable[paramColIdx++]);
             } else {
-
-                byte[] result = null;
-                for (int x = 0; x < flatTableIdx.length; x++) {
-                    byte[] split = toBytes(row.get(flatTableIdx[x]));
-                    if (result == null) {
-                        result = Arrays.copyOf(split, split.length);
-                    } else {
-                        byte[] newResult = new byte[result.length + split.length];
-                        System.arraycopy(result, 0, newResult, 0, result.length);
-                        System.arraycopy(split, 0, newResult, result.length, split.length);
-                        result = newResult;
-                    }
-                }
-                values[i] = measureCodec.getSerializer(i).valueOf(result);
+                value = param.getValue();
             }
+            inputToMeasure[i] = value;
         }
-        return values;
-    }
-
-    private byte[] toBytes(String v) {
-        return v == null ? null : Bytes.toBytes(v);
+        
+        return measureIngesters[idxOfMeasure].valueOf(inputToMeasure, measure, dictionaryMap);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
index 9d819a4..e8fa6d0 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/inmemcubing/InMemCubeBuilderUtils.java
@@ -18,11 +18,12 @@
 package org.apache.kylin.cube.inmemcubing;
 
 import com.google.common.collect.Maps;
+
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
index 4316376..62432f7 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/AbstractRowKeyEncoder.java
@@ -21,10 +21,10 @@ package org.apache.kylin.cube.kv;
 import java.util.Map;
 
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
index 7fedd90..ba15b48 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnIO.java
@@ -22,7 +22,7 @@ import java.util.Arrays;
 
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.BytesUtil;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.dict.IDictionaryAware;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
index a21fe9f..fea3736 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowKeyColumnOrder.java
@@ -21,7 +21,7 @@ package org.apache.kylin.cube.kv;
 import java.util.Collection;
 import java.util.Comparator;
 
-import org.apache.kylin.common.datatype.DataType;
+import org.apache.kylin.metadata.datatype.DataType;
 
 /**
  * @author yangli9

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index ef563ed..3e8ee13 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -41,6 +41,7 @@ import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.util.Array;
 import org.apache.kylin.common.util.CaseInsensitiveStringMap;
 import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.measure.MeasureType;
 import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.MetadataManager;
 import org.apache.kylin.metadata.model.ColumnDesc;
@@ -818,7 +819,8 @@ public class CubeDesc extends RootPersistentEntity {
         }
 
         for (MeasureDesc measure : measures) {
-            result.addAll(measure.getColumnsNeedDictionary());
+            MeasureType aggrType = MeasureType.create(measure.getFunction());
+            result.addAll(aggrType.getColumnsNeedDictionary(measure));
         }
         return result;
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
index 8e36009..1121621 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/validation/rule/FunctionRule.java
@@ -26,12 +26,12 @@ import java.util.Set;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.validation.IValidatorRule;
 import org.apache.kylin.cube.model.validation.ResultLevel;
 import org.apache.kylin.cube.model.validation.ValidateContext;
 import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
index 0cfd020..3e79226 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/util/CubingUtils.java
@@ -50,9 +50,11 @@ import com.google.common.collect.Maps;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
@@ -146,7 +148,7 @@ public class CubingUtils {
         return result;
     }
 
-    public static Map<TblColRef, Dictionary<?>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException {
+    public static Map<TblColRef, Dictionary<String>> buildDictionary(final CubeInstance cubeInstance, Iterable<List<String>> recordList) throws IOException {
         final List<TblColRef> columnsNeedToBuildDictionary = cubeInstance.getDescriptor().listDimensionColumnsExcludingDerived();
         final HashMap<Integer, TblColRef> tblColRefMap = Maps.newHashMap();
         int index = 0;
@@ -154,7 +156,7 @@ public class CubingUtils {
             tblColRefMap.put(index++, column);
         }
 
-        HashMap<TblColRef, Dictionary<?>> result = Maps.newHashMap();
+        HashMap<TblColRef, Dictionary<String>> result = Maps.newHashMap();
 
         HashMultimap<TblColRef, String> valueMap = HashMultimap.create();
         for (List<String> row : recordList) {
@@ -173,18 +175,19 @@ public class CubingUtils {
                     return input == null ? null : input.getBytes();
                 }
             });
-            final Dictionary<?> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
+            final Dictionary<String> dict = DictionaryGenerator.buildDictionaryFromValueEnumerator(tblColRef.getType(), new IterableDictionaryValueEnumerator(bytes));
             result.put(tblColRef, dict);
         }
         return result;
     }
 
-    public static Map<TblColRef, Dictionary<?>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<?>> dictionaryMap, long startOffset, long endOffset) {
-        Map<TblColRef, Dictionary<?>> realDictMap = Maps.newHashMap();
+    @SuppressWarnings("unchecked")
+    public static Map<TblColRef, Dictionary<String>> writeDictionary(CubeSegment cubeSegment, Map<TblColRef, Dictionary<String>> dictionaryMap, long startOffset, long endOffset) {
+        Map<TblColRef, Dictionary<String>> realDictMap = Maps.newHashMap();
 
-        for (Map.Entry<TblColRef, Dictionary<?>> entry : dictionaryMap.entrySet()) {
+        for (Map.Entry<TblColRef, Dictionary<String>> entry : dictionaryMap.entrySet()) {
             final TblColRef tblColRef = entry.getKey();
-            final Dictionary<?> dictionary = entry.getValue();
+            final Dictionary<String> dictionary = entry.getValue();
             ReadableTable.TableSignature signature = new ReadableTable.TableSignature();
             signature.setLastModifiedTime(System.currentTimeMillis());
             signature.setPath(String.format("streaming_%s_%s", startOffset, endOffset));
@@ -195,7 +198,7 @@ public class CubingUtils {
             try {
                 DictionaryInfo realDict = dictionaryManager.trySaveNewDict(dictionary, dictInfo);
                 cubeSegment.putDictResPath(tblColRef, realDict.getResourcePath());
-                realDictMap.put(tblColRef, realDict.getDictionaryObject());
+                realDictMap.put(tblColRef, (Dictionary<String>) realDict.getDictionaryObject());
             } catch (IOException e) {
                 logger.error("error save dictionary for column:" + tblColRef, e);
                 throw new RuntimeException("error save dictionary for column:" + tblColRef, e);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
index f7623e3..714571f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/DefaultGTComparator.java
@@ -1,7 +1,7 @@
 package org.apache.kylin.gridtable;
 
 import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.common.util.Dictionary;
 
 public class DefaultGTComparator implements IGTComparator {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
index 01696e8..eb8d212 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTAggregateScanner.java
@@ -7,10 +7,10 @@ import java.util.Iterator;
 import java.util.Map.Entry;
 import java.util.SortedMap;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
+import org.apache.kylin.measure.MeasureAggregator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
index 229c679..d3a03d1 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTInfo.java
@@ -5,10 +5,10 @@ import java.util.BitSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.kylin.common.datatype.DataType;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.cube.gridtable.CubeCodeSystem;
 import org.apache.kylin.cube.util.KryoUtils;
+import org.apache.kylin.metadata.datatype.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
 
 public class GTInfo {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
index 2783f55..b3133be 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/GTSampleCodeSystem.java
@@ -2,9 +2,9 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
-import org.apache.kylin.common.datatype.DataTypeSerializer;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.MeasureAggregator;
+import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 @SuppressWarnings({ "rawtypes", "unchecked" })
 /**

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
index 0e61cf2..3a22091 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/IGTCodeSystem.java
@@ -2,8 +2,8 @@ package org.apache.kylin.gridtable;
 
 import java.nio.ByteBuffer;
 
-import org.apache.kylin.aggregation.MeasureAggregator;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.measure.MeasureAggregator;
 
 public interface IGTCodeSystem {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
index f8d7f30..ff71b4f 100644
--- a/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
+++ b/core-cube/src/main/java/org/apache/kylin/gridtable/UnitTestSupport.java
@@ -22,11 +22,11 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.datatype.LongMutable;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.gridtable.GTInfo.Builder;
+import org.apache.kylin.metadata.datatype.DataType;
+import org.apache.kylin.metadata.datatype.LongMutable;
 
 public class UnitTestSupport {
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java b/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
deleted file mode 100644
index 8ae44b6..0000000
--- a/core-cube/src/test/java/org/apache/kylin/aggregation/topn/TopNCounterSerializerTest.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package org.apache.kylin.aggregation.topn;
-
-import java.nio.ByteBuffer;
-
-import org.apache.kylin.common.datatype.DataType;
-import org.apache.kylin.common.topn.TopNCounter;
-import org.apache.kylin.common.util.ByteArray;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.BytesUtil;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * 
- */
-public class TopNCounterSerializerTest {
-
-    private static TopNCounterSerializer serializer = new TopNCounterSerializer(DataType.getType("topn(10)"));
-
-    @Test
-    public void testSerialization() {
-        TopNCounter<ByteArray> vs = new TopNCounter<ByteArray>(50);
-        Integer[] stream = { 1, 1, 2, 9, 1, 2, 3, 7, 7, 1, 3, 1, 1 };
-        for (Integer i : stream) {
-            vs.offer(new ByteArray(Bytes.toBytes(i)));
-        }
-
-        ByteBuffer out = ByteBuffer.allocate(1024);
-        serializer.serialize(vs, out);
-        
-        byte[] copyBytes = new byte[out.position()];
-        System.arraycopy(out.array(), 0, copyBytes, 0, out.position());
-
-        ByteBuffer in = ByteBuffer.wrap(copyBytes);
-        TopNCounter<ByteArray> vsNew = serializer.deserialize(in);
-
-        Assert.assertEquals(vs.toString(), vsNew.toString());
-
-    }
-    
-    @Test
-    public void testValueOf() {
-
-        TopNCounter<ByteArray> origin = new TopNCounter<ByteArray>(10);
-        ByteArray key = new ByteArray(1);
-        ByteBuffer byteBuffer = key.asBuffer();
-        BytesUtil.writeVLong(20l, byteBuffer);
-        origin.offer(key, 1.0);
-
-        byteBuffer = ByteBuffer.allocate(1024);
-        byteBuffer.putInt(1);
-        byteBuffer.putInt(20);
-        byteBuffer.putDouble(1.0);
-        TopNCounter<ByteArray> counter = serializer.valueOf(byteBuffer.array());
-
-
-        Assert.assertEquals(origin.toString(), counter.toString());
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
index d7feb56..bce228d 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/DictionaryManagerTest.java
@@ -23,10 +23,10 @@ import static org.junit.Assert.assertTrue;
 
 import java.util.HashSet;
 
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.dict.DictionaryInfo;
 import org.apache.kylin.dict.DictionaryManager;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/ce61309a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
index 935e840..c25bad7 100644
--- a/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/cube/inmemcubing/DoggedCubeBuilderStressTest.java
@@ -26,10 +26,10 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.dict.Dictionary;
 import org.apache.kylin.gridtable.GTRecord;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.junit.AfterClass;
@@ -51,7 +51,7 @@ public class DoggedCubeBuilderStressTest extends LocalFileMetadataTestCase {
 
     private static CubeInstance cube;
     private static String flatTable;
-    private static Map<TblColRef, Dictionary<?>> dictionaryMap;
+    private static Map<TblColRef, Dictionary<String>> dictionaryMap;
 
     @BeforeClass
     public static void before() throws IOException {