You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by li...@apache.org on 2015/07/22 16:05:46 UTC

incubator-kylin git commit: KYLIN-875 Refactor core-cube, drop hadoop/hbase dependency

Repository: incubator-kylin
Updated Branches:
  refs/heads/0.8 5eda06529 -> 3823545a0


KYLIN-875 Refactor core-cube, drop hadoop/hbase dependency


Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/3823545a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/3823545a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/3823545a

Branch: refs/heads/0.8
Commit: 3823545a0236113485c89a21fdceae8451b283ce
Parents: 5eda065
Author: Yang Li <li...@apache.org>
Authored: Wed Jul 22 22:04:32 2015 +0800
Committer: Yang Li <li...@apache.org>
Committed: Wed Jul 22 22:04:32 2015 +0800

----------------------------------------------------------------------
 core-cube/pom.xml                               |  58 --------
 .../apache/kylin/cube/kv/RowValueDecoder.java   | 138 -------------------
 .../org/apache/kylin/cube/model/CubeDesc.java   |   2 +-
 .../apache/kylin/cube/model/v1/CubeDesc.java    |   2 +-
 .../kylin/cube/kv/RowValueDecoderTest.java      | 101 --------------
 .../metadata/measure/MeasureCodecTest.java      |   6 +-
 .../kylin/job/inmemcubing/InMemCubeBuilder.java |  40 ++++--
 .../kylin/storage/hbase/HBaseMROutput2.java     |   1 -
 .../kylin/job/hadoop/cube/CubeReducerTest.java  |  17 +--
 .../kylin/storage/hbase/RowValueDecoder.java    | 131 ++++++++++++++++++
 .../storage/hbase/RowValueDecoderTest.java      | 103 ++++++++++++++
 .../storage/gridtable/UnitTestSupport.java      |  24 ++--
 .../storage/hbase/CubeSegmentTupleIterator.java |   2 +-
 .../kylin/storage/hbase/CubeStorageQuery.java   |   1 -
 .../kylin/storage/hbase/CubeTupleConverter.java |   1 -
 .../hbase/SerializedHBaseTupleIterator.java     |   1 -
 .../endpoint/EndpointAggregators.java           |   6 +-
 .../observer/ObserverAggregators.java           |   4 +-
 .../coprocessor/observer/ObserverEnabler.java   |   3 +-
 .../org/apache/kylin/storage/tuple/Tuple.java   |  18 +--
 .../storage/gridtable/DictGridTableTest.java    |  22 +--
 .../storage/gridtable/SimpleGridTableTest.java  |  12 +-
 .../gridtable/SimpleInvertedIndexTest.java      |   4 +-
 .../endpoint/EndpointAggregationTest.java       |  16 ++-
 .../observer/AggregateRegionObserverTest.java   |  26 ++--
 25 files changed, 340 insertions(+), 399 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/pom.xml
----------------------------------------------------------------------
diff --git a/core-cube/pom.xml b/core-cube/pom.xml
index 8751a78..2b7177a 100644
--- a/core-cube/pom.xml
+++ b/core-cube/pom.xml
@@ -53,64 +53,6 @@
             <version>${project.parent.version}</version>
         </dependency>
         <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-annotations</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-core</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-minicluster</artifactId>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.mrunit</groupId>
-            <artifactId>mrunit</artifactId>
-            <classifier>hadoop2</classifier>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-hadoop2-compat</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-client</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-server</artifactId>
-            <scope>provided</scope>
-            <!-- version conflict with hadoop2.2 -->
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.hadoop</groupId>
-                    <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hadoop</groupId>
-            <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java b/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
deleted file mode 100644
index 2ab12b9..0000000
--- a/core-cube/src/main/java/org/apache/kylin/cube/kv/RowValueDecoder.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.kv;
-
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-import java.util.Collection;
-
-/**
- * @author xjiang
- */
-public class RowValueDecoder implements Cloneable {
-
-    private final HBaseColumnDesc hbaseColumn;
-    private final byte[] hbaseColumnFamily;
-    private final byte[] hbaseColumnQualifier;
-
-    private final MeasureCodec codec;
-    private final BitSet projectionIndex;
-    private final MeasureDesc[] measures;
-    private Object[] values;
-
-    public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
-        this.hbaseColumn = hbaseColumn;
-        this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
-        this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
-        this.projectionIndex = new BitSet();
-        this.measures = hbaseColumn.getMeasures();
-        this.codec = new MeasureCodec(measures);
-        this.values = new Object[measures.length];
-    }
-
-    public void decode(Result hbaseRow) {
-        decode(hbaseRow, true);
-    }
-
-    public void decode(Result hbaseRow, boolean convertToJavaObject) {
-        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
-    }
-
-    public void decode(byte[] bytes) {
-        decode(bytes, true);
-    }
-
-    public void decode(byte[] bytes, boolean convertToJavaObject) {
-        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
-    }
-
-    private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
-        codec.decode(buffer, values);
-        if (convertToJavaObject) {
-            convertToJavaObjects(values, values, convertToJavaObject);
-        }
-    }
-
-    private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
-        for (int i = 0; i < mapredObjs.length; i++) {
-            Object o = mapredObjs[i];
-
-            if (o instanceof LongWritable)
-                o = ((LongWritable) o).get();
-            else if (o instanceof IntWritable)
-                o = ((IntWritable) o).get();
-            else if (o instanceof DoubleWritable)
-                o = ((DoubleWritable) o).get();
-            else if (o instanceof FloatWritable)
-                o = ((FloatWritable) o).get();
-
-            results[i] = o;
-        }
-    }
-
-    public void setIndex(int bitIndex) {
-        projectionIndex.set(bitIndex);
-    }
-
-    public HBaseColumnDesc getHBaseColumn() {
-        return hbaseColumn;
-    }
-
-    public BitSet getProjectionIndex() {
-        return projectionIndex;
-    }
-
-    public Object[] getValues() {
-        return values;
-    }
-
-    public MeasureDesc[] getMeasures() {
-        return measures;
-    }
-
-    public boolean hasMemHungryCountDistinct() {
-        for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
-            FunctionDesc func = measures[i].getFunction();
-            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
-                return true;
-            }
-        }
-        return false;
-    }
-
-    public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
-        for (RowValueDecoder decoder : rowValueDecoders) {
-            if (decoder.hasMemHungryCountDistinct())
-                return true;
-        }
-        return false;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 56bb78f..654c7c3 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -33,9 +33,9 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
-import org.apache.commons.net.util.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/src/main/java/org/apache/kylin/cube/model/v1/CubeDesc.java
----------------------------------------------------------------------
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/v1/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/v1/CubeDesc.java
index 1c2d63e..45d6f43 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/v1/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/v1/CubeDesc.java
@@ -33,8 +33,8 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.commons.codec.binary.Base64;
 import org.apache.commons.lang.ArrayUtils;
-import org.apache.commons.net.util.Base64;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.RootPersistentEntity;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowValueDecoderTest.java b/core-cube/src/test/java/org/apache/kylin/cube/kv/RowValueDecoderTest.java
deleted file mode 100644
index 369e6f6..0000000
--- a/core-cube/src/test/java/org/apache/kylin/cube/kv/RowValueDecoderTest.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *     http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.cube.kv;
-
-import org.apache.hadoop.io.LongWritable;
-import org.apache.kylin.common.util.LocalFileMetadataTestCase;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.cube.model.HBaseColumnDesc;
-import org.apache.kylin.metadata.MetadataManager;
-import org.apache.kylin.metadata.measure.MeasureCodec;
-import org.apache.kylin.metadata.model.FunctionDesc;
-import org.apache.kylin.metadata.model.MeasureDesc;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-
-import static org.junit.Assert.assertEquals;
-
-/**
- * @author George Song (ysong1)
- * 
- */
-public class RowValueDecoderTest extends LocalFileMetadataTestCase {
-
-    @Before
-    public void setUp() throws Exception {
-        this.createTestMetadata();
-        MetadataManager.clearCache();
-    }
-
-    @After
-    public void after() throws Exception {
-        this.cleanupTestMetadata();
-    }
-
-    @Test
-    public void testDecode() throws Exception {
-        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
-        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
-
-        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
-        BigDecimal sum = new BigDecimal("333.1234567");
-        BigDecimal min = new BigDecimal("333.1111111");
-        BigDecimal max = new BigDecimal("333.1999999");
-        LongWritable count = new LongWritable(2);
-        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count }, buf);
-
-        buf.flip();
-        byte[] valueBytes = new byte[buf.limit()];
-        System.arraycopy(buf.array(), 0, valueBytes, 0, buf.limit());
-
-        RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
-        for (MeasureDesc measure : cubeDesc.getMeasures()) {
-            FunctionDesc aggrFunc = measure.getFunction();
-            int index = hbaseCol.findMeasureIndex(aggrFunc);
-            rowValueDecoder.setIndex(index);
-        }
-
-        rowValueDecoder.decode(valueBytes);
-        Object[] measureValues = rowValueDecoder.getValues();
-        //BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
-        assertEquals("[333.1235, 333.1111, 333.2000, 2]", Arrays.toString(measureValues));
-    }
-
-    @Test(expected = IllegalArgumentException.class)
-    public void testError() throws Exception {
-        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
-        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
-
-        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
-        BigDecimal sum = new BigDecimal("11111111111111111111333.1234567");
-        BigDecimal min = new BigDecimal("333.1111111");
-        BigDecimal max = new BigDecimal("333.1999999");
-        LongWritable count = new LongWritable(2);
-        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
-        codec.encode(new Object[] { sum, min, max, count }, buf);
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
----------------------------------------------------------------------
diff --git a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
index 80a1543..4fc708d 100644
--- a/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
+++ b/core-cube/src/test/java/org/apache/kylin/metadata/measure/MeasureCodecTest.java
@@ -23,8 +23,6 @@ import static org.junit.Assert.*;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.cube.kv.RowConstants;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -41,8 +39,8 @@ public class MeasureCodecTest {
         MeasureDesc descs[] = new MeasureDesc[] { measure("double"), measure("long"), measure("decimal"), measure("HLLC16") };
         MeasureCodec codec = new MeasureCodec(descs);
 
-        DoubleWritable d = new DoubleWritable(1.0);
-        LongWritable l = new LongWritable(2);
+        DoubleMutable d = new DoubleMutable(1.0);
+        LongMutable l = new LongMutable(2);
         BigDecimal b = new BigDecimal("333.1234");
         HyperLogLogPlusCounter hllc = new HyperLogLogPlusCounter(16);
         hllc.add("1234567");

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
index b0e4402..2cde011 100644
--- a/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
+++ b/job/src/main/java/org/apache/kylin/job/inmemcubing/InMemCubeBuilder.java
@@ -16,10 +16,20 @@
  */
 package org.apache.kylin.job.inmemcubing;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.io.DoubleWritable;
-import org.apache.hadoop.io.LongWritable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentNavigableMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.common.util.MemoryBudgetController;
@@ -31,19 +41,25 @@ import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.dict.Dictionary;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.cube.CubeGridTable;
-import org.apache.kylin.storage.gridtable.*;
+import org.apache.kylin.storage.gridtable.GTAggregateScanner;
+import org.apache.kylin.storage.gridtable.GTBuilder;
+import org.apache.kylin.storage.gridtable.GTInfo;
+import org.apache.kylin.storage.gridtable.GTRecord;
+import org.apache.kylin.storage.gridtable.GTScanRequest;
+import org.apache.kylin.storage.gridtable.GridTable;
+import org.apache.kylin.storage.gridtable.IGTScanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.*;
-import java.util.concurrent.atomic.AtomicInteger;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 
 /**
  * Build a cube (many cuboids) in memory. Calculating multiple cuboids at the same time as long as memory permits.
@@ -52,7 +68,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
 
     private static Logger logger = LoggerFactory.getLogger(InMemCubeBuilder.class);
-    private static final LongWritable ONE = new LongWritable(1l);
+    private static final LongMutable ONE = new LongMutable(1l);
 
     private final CuboidScheduler cuboidScheduler;
     private final long baseCuboidId;
@@ -481,8 +497,8 @@ public class InMemCubeBuilder extends AbstractInMemCubeBuilder {
     private void sanityCheck(Object[] totalSum) {
         // double sum introduces error and causes result not exactly equal
         for (int i = 0; i < totalSum.length; i++) {
-            if (totalSum[i] instanceof DoubleWritable) {
-                totalSum[i] = Math.round(((DoubleWritable) totalSum[i]).get());
+            if (totalSum[i] instanceof DoubleMutable) {
+                totalSum[i] = Math.round(((DoubleMutable) totalSum[i]).get());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
----------------------------------------------------------------------
diff --git a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
index b5f6d65..f4be234 100644
--- a/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
+++ b/job/src/main/java/org/apache/kylin/storage/hbase/HBaseMROutput2.java
@@ -57,7 +57,6 @@ import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.cube.model.HBaseColumnFamilyDesc;
 import org.apache.kylin.engine.mr.ByteArrayWritable;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
----------------------------------------------------------------------
diff --git a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
index 75e49da..5369284 100644
--- a/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
+++ b/job/src/test/java/org/apache/kylin/job/hadoop/cube/CubeReducerTest.java
@@ -27,24 +27,21 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
 import org.apache.hadoop.mrunit.types.Pair;
-import org.apache.kylin.job.constant.BatchConstants;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Test;
-
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.job.constant.BatchConstants;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
 
 /**
- * @author George Song (ysong1)
- * 
  */
 public class CubeReducerTest extends LocalFileMetadataTestCase {
 
@@ -112,7 +109,7 @@ public class CubeReducerTest extends LocalFileMetadataTestCase {
     }
 
     private Text newValueText(MeasureCodec codec, String sum, String min, String max, int count) {
-        Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new LongWritable(count) };
+        Object[] values = new Object[] { new BigDecimal(sum), new BigDecimal(min), new BigDecimal(max), new LongMutable(count) };
 
         buf.clear();
         codec.encode(values, buf);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/RowValueDecoder.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/RowValueDecoder.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/RowValueDecoder.java
new file mode 100644
index 0000000..30d5613
--- /dev/null
+++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/RowValueDecoder.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+import java.util.Collection;
+
+/**
+ */
+public class RowValueDecoder implements Cloneable {
+
+    private final HBaseColumnDesc hbaseColumn;
+    private final byte[] hbaseColumnFamily;
+    private final byte[] hbaseColumnQualifier;
+
+    private final MeasureCodec codec;
+    private final BitSet projectionIndex;
+    private final MeasureDesc[] measures;
+    private Object[] values;
+
+    public RowValueDecoder(HBaseColumnDesc hbaseColumn) {
+        this.hbaseColumn = hbaseColumn;
+        this.hbaseColumnFamily = Bytes.toBytes(hbaseColumn.getColumnFamilyName());
+        this.hbaseColumnQualifier = Bytes.toBytes(hbaseColumn.getQualifier());
+        this.projectionIndex = new BitSet();
+        this.measures = hbaseColumn.getMeasures();
+        this.codec = new MeasureCodec(measures);
+        this.values = new Object[measures.length];
+    }
+
+    public void decode(Result hbaseRow) {
+        decode(hbaseRow, true);
+    }
+
+    public void decode(Result hbaseRow, boolean convertToJavaObject) {
+        decode(hbaseRow.getValueAsByteBuffer(hbaseColumnFamily, hbaseColumnQualifier), convertToJavaObject);
+    }
+
+    public void decode(byte[] bytes) {
+        decode(bytes, true);
+    }
+
+    public void decode(byte[] bytes, boolean convertToJavaObject) {
+        decode(ByteBuffer.wrap(bytes), convertToJavaObject);
+    }
+
+    private void decode(ByteBuffer buffer, boolean convertToJavaObject) {
+        codec.decode(buffer, values);
+        if (convertToJavaObject) {
+            convertToJavaObjects(values, values, convertToJavaObject);
+        }
+    }
+
+    private void convertToJavaObjects(Object[] mapredObjs, Object[] results, boolean convertToJavaObject) {
+        for (int i = 0; i < mapredObjs.length; i++) {
+            Object o = mapredObjs[i];
+
+            if (o instanceof LongMutable)
+                o = ((LongMutable) o).get();
+            else if (o instanceof DoubleMutable)
+                o = ((DoubleMutable) o).get();
+
+            results[i] = o;
+        }
+    }
+
+    public void setIndex(int bitIndex) {
+        projectionIndex.set(bitIndex);
+    }
+
+    public HBaseColumnDesc getHBaseColumn() {
+        return hbaseColumn;
+    }
+
+    public BitSet getProjectionIndex() {
+        return projectionIndex;
+    }
+
+    public Object[] getValues() {
+        return values;
+    }
+
+    public MeasureDesc[] getMeasures() {
+        return measures;
+    }
+
+    public boolean hasMemHungryCountDistinct() {
+        for (int i = projectionIndex.nextSetBit(0); i >= 0; i = projectionIndex.nextSetBit(i + 1)) {
+            FunctionDesc func = measures[i].getFunction();
+            if (func.isCountDistinct() && !func.isHolisticCountDistinct()) {
+                return true;
+            }
+        }
+        return false;
+    }
+
+    public static boolean hasMemHungryCountDistinct(Collection<RowValueDecoder> rowValueDecoders) {
+        for (RowValueDecoder decoder : rowValueDecoders) {
+            if (decoder.hasMemHungryCountDistinct())
+                return true;
+        }
+        return false;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
----------------------------------------------------------------------
diff --git a/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
new file mode 100644
index 0000000..dcc5b06
--- /dev/null
+++ b/storage-hbase/src/test/java/org/apache/kylin/storage/hbase/RowValueDecoderTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.storage.hbase;
+
+import static org.junit.Assert.*;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.LongWritable;
+import org.apache.kylin.common.util.LocalFileMetadataTestCase;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.HBaseColumnDesc;
+import org.apache.kylin.metadata.MetadataManager;
+import org.apache.kylin.metadata.measure.LongMutable;
+import org.apache.kylin.metadata.measure.MeasureCodec;
+import org.apache.kylin.metadata.model.FunctionDesc;
+import org.apache.kylin.metadata.model.MeasureDesc;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * @author George Song (ysong1)
+ * 
+ */
+public class RowValueDecoderTest extends LocalFileMetadataTestCase {
+
+    @Before
+    public void setUp() throws Exception {
+        this.createTestMetadata();
+        MetadataManager.clearCache();
+    }
+
+    @After
+    public void after() throws Exception {
+        this.cleanupTestMetadata();
+    }
+
+    @Test
+    public void testDecode() throws Exception {
+        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
+        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
+
+        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BigDecimal sum = new BigDecimal("333.1234567");
+        BigDecimal min = new BigDecimal("333.1111111");
+        BigDecimal max = new BigDecimal("333.1999999");
+        LongMutable count = new LongMutable(2);
+        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        codec.encode(new Object[] { sum, min, max, count }, buf);
+
+        buf.flip();
+        byte[] valueBytes = new byte[buf.limit()];
+        System.arraycopy(buf.array(), 0, valueBytes, 0, buf.limit());
+
+        RowValueDecoder rowValueDecoder = new RowValueDecoder(hbaseCol);
+        for (MeasureDesc measure : cubeDesc.getMeasures()) {
+            FunctionDesc aggrFunc = measure.getFunction();
+            int index = hbaseCol.findMeasureIndex(aggrFunc);
+            rowValueDecoder.setIndex(index);
+        }
+
+        rowValueDecoder.decode(valueBytes);
+        Object[] measureValues = rowValueDecoder.getValues();
+        //BigDecimal.ROUND_HALF_EVEN in BigDecimalSerializer
+        assertEquals("[333.1235, 333.1111, 333.2000, 2]", Arrays.toString(measureValues));
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testError() throws Exception {
+        CubeDesc cubeDesc = CubeManager.getInstance(getTestConfig()).getCube("test_kylin_cube_with_slr_ready").getDescriptor();
+        HBaseColumnDesc hbaseCol = cubeDesc.getHBaseMapping().getColumnFamily()[0].getColumns()[0];
+
+        MeasureCodec codec = new MeasureCodec(hbaseCol.getMeasures());
+        BigDecimal sum = new BigDecimal("11111111111111111111333.1234567");
+        BigDecimal min = new BigDecimal("333.1111111");
+        BigDecimal max = new BigDecimal("333.1999999");
+        LongWritable count = new LongWritable(2);
+        ByteBuffer buf = ByteBuffer.allocate(RowConstants.ROWVALUE_BUFFER_SIZE);
+        codec.encode(new Object[] { sum, min, max, count }, buf);
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
index 4b872a6..b1a7180 100644
--- a/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
+++ b/storage/src/main/java/org/apache/kylin/storage/gridtable/UnitTestSupport.java
@@ -22,9 +22,9 @@ import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.storage.gridtable.GTInfo.Builder;
 
@@ -67,16 +67,16 @@ public class UnitTestSupport {
             String d_01_15 = datePlus("2015-01-15", i * 4);
             String d_01_16 = datePlus("2015-01-16", i * 4);
             String d_01_17 = datePlus("2015-01-17", i * 4);
-            result.add(newRec(info, d_01_14, "Yang", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_14, "Luke", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Xu", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Dong", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_15, "Jason", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_16, "George", "Food", new LongWritable(10), new BigDecimal("10.5")));
-            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongWritable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Yang", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_14, "Luke", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Xu", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Dong", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_15, "Jason", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Mahone", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Shaofeng", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "Qianhao", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_16, "George", "Food", new LongMutable(10), new BigDecimal("10.5")));
+            result.add(newRec(info, d_01_17, "Kejia", "Food", new LongMutable(10), new BigDecimal("10.5")));
         }
         return result;
     }
@@ -87,7 +87,7 @@ public class UnitTestSupport {
         return DateFormat.formatToDateStr(millis);
     }
 
-    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongWritable amount, BigDecimal price) {
+    private static GTRecord newRec(GTInfo info, String date, String name, String category, LongMutable amount, BigDecimal price) {
         GTRecord rec = new GTRecord(info);
         return rec.setValues(date, name, category, amount, price);
     }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
index a8027c0..63623ca 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeSegmentTupleIterator.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.storage.hbase;
 
 import com.google.common.collect.Lists;
+
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -29,7 +30,6 @@ import org.apache.kylin.common.persistence.StorageException;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.HBaseColumnDesc;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
index 889783a..bd5f21a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeStorageQuery.java
@@ -30,7 +30,6 @@ import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.cube.model.HBaseColumnDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
index 4a6b767..8d0af58 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/CubeTupleConverter.java
@@ -12,7 +12,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
 import org.apache.kylin.cube.kv.RowKeyDecoder;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.cube.model.CubeDesc.DeriveInfo;
 import org.apache.kylin.dict.lookup.LookupStringTable;
 import org.apache.kylin.metadata.model.FunctionDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
index 9cc9eb2..5e6d9a0 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/SerializedHBaseTupleIterator.java
@@ -25,7 +25,6 @@ import com.google.common.collect.Range;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
index 0ce0012..91fe961 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregators.java
@@ -21,21 +21,21 @@ package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.hll.HyperLogLogPlusCounter;
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.invertedindex.index.RawTableRecord;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
 import org.apache.kylin.invertedindex.index.TableRecordInfoDigest;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -114,7 +114,7 @@ public class EndpointAggregators {
     final transient FixedLenMeasureCodec[] measureSerializers;
     final transient Object[] metricValues;
 
-    final LongWritable ONE = new LongWritable(1);
+    final LongMutable ONE = new LongMutable(1);
 
     private EndpointAggregators(String[] funcNames, String[] dataTypes, MetricInfo[] metricInfos, TableRecordInfoDigest tableInfo) {
         this.funcNames = funcNames;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregators.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregators.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregators.java
index 0610ea5..8e0e8f7 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregators.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverAggregators.java
@@ -24,15 +24,13 @@ import java.util.Collection;
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.kylin.storage.hbase.RowValueDecoder;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorConstants;
-
 import org.apache.hadoop.hbase.Cell;
 import org.apache.kylin.common.util.Bytes;
-
 import org.apache.kylin.common.util.BytesSerializer;
 import org.apache.kylin.common.util.BytesUtil;
 import org.apache.kylin.cube.kv.RowConstants;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.measure.MeasureCodec;
 import org.apache.kylin.cube.model.HBaseColumnDesc;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
index c1ef5f8..3bbf43a 100644
--- a/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
+++ b/storage/src/main/java/org/apache/kylin/storage/hbase/coprocessor/observer/ObserverEnabler.java
@@ -20,6 +20,7 @@ package org.apache.kylin.storage.hbase.coprocessor.observer;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+
 import org.apache.hadoop.hbase.client.HTableInterface;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
@@ -29,12 +30,12 @@ import org.apache.kylin.common.debug.BackdoorToggles;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.Cuboid;
-import org.apache.kylin.cube.kv.RowValueDecoder;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.StorageContext;
 import org.apache.kylin.storage.hbase.RegionScannerAdapter;
 import org.apache.kylin.storage.hbase.ResultScannerAdapter;
+import org.apache.kylin.storage.hbase.RowValueDecoder;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorFilter;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorProjector;
 import org.apache.kylin.storage.hbase.coprocessor.CoprocessorRowType;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
index 8a85c3a..c6b1a18 100644
--- a/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
+++ b/storage/src/main/java/org/apache/kylin/storage/tuple/Tuple.java
@@ -24,11 +24,9 @@ import java.util.List;
 
 import net.sf.ehcache.pool.sizeof.annotations.IgnoreSizeOf;
 
-import org.apache.hadoop.hive.serde2.io.DoubleWritable;
-import org.apache.hadoop.io.FloatWritable;
-import org.apache.hadoop.io.IntWritable;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.metadata.measure.DoubleMutable;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.metadata.tuple.ITuple;
 
@@ -123,14 +121,10 @@ public class Tuple implements ITuple {
     }
 
     private Object convertWritableToJava(Object o) {
-        if (o instanceof LongWritable)
-            o = ((LongWritable) o).get();
-        else if (o instanceof IntWritable)
-            o = ((IntWritable) o).get();
-        else if (o instanceof DoubleWritable)
-            o = ((DoubleWritable) o).get();
-        else if (o instanceof FloatWritable)
-            o = ((FloatWritable) o).get();
+        if (o instanceof LongMutable)
+            o = ((LongMutable) o).get();
+        else if (o instanceof DoubleMutable)
+            o = ((DoubleMutable) o).get();
         return o;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
index 0dec3c3..6092956 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/DictGridTableTest.java
@@ -27,7 +27,6 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.common.util.ImmutableBitSet;
 import org.apache.kylin.dict.Dictionary;
@@ -41,6 +40,7 @@ import org.apache.kylin.metadata.filter.ExtractTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TableDesc;
@@ -299,16 +299,16 @@ public class DictGridTableTest {
         GTRecord r = new GTRecord(table.getInfo());
         GTBuilder builder = table.rebuild();
 
-        builder.write(r.setValues("2015-01-14", "30", "Yang", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-14", "30", "Luke", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "20", "Dong", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "20", "Jason", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-15", "30", "Xu", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "30", "George", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongWritable(10), new BigDecimal("10.5")));
-        builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongWritable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Yang", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-14", "30", "Luke", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Dong", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "20", "Jason", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-15", "30", "Xu", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Mahone", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "20", "Qianhao", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "George", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-16", "30", "Shaofeng", new LongMutable(10), new BigDecimal("10.5")));
+        builder.write(r.setValues("2015-01-17", "10", "Kejia", new LongMutable(10), new BigDecimal("10.5")));
         builder.close();
 
         return table;

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
index 73f92c0..be0d13e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleGridTableTest.java
@@ -24,8 +24,8 @@ import java.math.BigDecimal;
 import java.util.BitSet;
 import java.util.List;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.ImmutableBitSet;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.storage.gridtable.memstore.GTSimpleMemStore;
 import org.junit.Test;
 
@@ -86,7 +86,7 @@ public class SimpleGridTableTest {
             Object[] v = r.getValues();
             assertTrue(((String) v[0]).startsWith("2015-"));
             assertTrue(((String) v[2]).equals("Food"));
-            assertTrue(((LongWritable) v[3]).get() == 10);
+            assertTrue(((LongMutable) v[3]).get() == 10);
             assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
             System.out.println(r);
         }
@@ -104,19 +104,19 @@ public class SimpleGridTableTest {
             Object[] v = r.getValues();
             switch (i) {
             case 0:
-                assertTrue(((LongWritable) v[3]).get() == 20);
+                assertTrue(((LongMutable) v[3]).get() == 20);
                 assertTrue(((BigDecimal) v[4]).doubleValue() == 21.0);
                 break;
             case 1:
-                assertTrue(((LongWritable) v[3]).get() == 30);
+                assertTrue(((LongMutable) v[3]).get() == 30);
                 assertTrue(((BigDecimal) v[4]).doubleValue() == 31.5);
                 break;
             case 2:
-                assertTrue(((LongWritable) v[3]).get() == 40);
+                assertTrue(((LongMutable) v[3]).get() == 40);
                 assertTrue(((BigDecimal) v[4]).doubleValue() == 42.0);
                 break;
             case 3:
-                assertTrue(((LongWritable) v[3]).get() == 10);
+                assertTrue(((LongMutable) v[3]).get() == 10);
                 assertTrue(((BigDecimal) v[4]).doubleValue() == 10.5);
                 break;
             default:

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
index 3e6f672..a9ab61c 100644
--- a/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/gridtable/SimpleInvertedIndexTest.java
@@ -24,7 +24,6 @@ import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.ByteArray;
 import org.apache.kylin.metadata.filter.ColumnTupleFilter;
 import org.apache.kylin.metadata.filter.CompareTupleFilter;
@@ -32,6 +31,7 @@ import org.apache.kylin.metadata.filter.ConstantTupleFilter;
 import org.apache.kylin.metadata.filter.LogicalTupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter;
 import org.apache.kylin.metadata.filter.TupleFilter.FilterOperatorEnum;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.serializer.StringSerializer;
 import org.apache.kylin.metadata.model.DataType;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -57,7 +57,7 @@ public class SimpleInvertedIndexTest {
         GTRowBlock.Writer writer = mockBlock.getWriter();
         GTRecord record = new GTRecord(info);
         for (int i = 0; i < 10; i++) {
-            record.setValues(i < 9 ? "" + i : null, "", "", new LongWritable(0), new BigDecimal(0));
+            record.setValues(i < 9 ? "" + i : null, "", "", new LongMutable(0), new BigDecimal(0));
             for (int j = 0; j < info.getRowBlockSize(); j++) {
                 writer.append(record);
             }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
index 3217f1c..025b66a 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/endpoint/EndpointAggregationTest.java
@@ -18,17 +18,21 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.endpoint;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
 
 import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
 
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.index.TableRecordInfo;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.measure.MeasureAggregator;
 import org.apache.kylin.metadata.measure.fixedlen.FixedLenMeasureCodec;
 import org.apache.kylin.metadata.model.FunctionDesc;
@@ -152,8 +156,8 @@ public class EndpointAggregationTest extends LocalFileMetadataTestCase {
         long sumTotal = 0;
         long minTotal = 0;
         for (Map.Entry<AggrKey, MeasureAggregator[]> entry : aggCache.getAllEntries()) {
-            sumTotal += ((LongWritable) entry.getValue()[0].getState()).get();
-            minTotal += ((LongWritable) entry.getValue()[1].getState()).get();
+            sumTotal += ((LongMutable) entry.getValue()[0].getState()).get();
+            minTotal += ((LongMutable) entry.getValue()[1].getState()).get();
 
         }
         assertEquals(3020800, sumTotal);

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/3823545a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
----------------------------------------------------------------------
diff --git a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
index ba09b51..a98ee0e 100644
--- a/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
+++ b/storage/src/test/java/org/apache/kylin/storage/hbase/coprocessor/observer/AggregateRegionObserverTest.java
@@ -18,16 +18,25 @@
 
 package org.apache.kylin.storage.hbase.coprocessor.observer;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
-import org.apache.hadoop.io.LongWritable;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.kv.RowConstants;
+import org.apache.kylin.metadata.measure.LongMutable;
 import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
@@ -38,16 +47,7 @@ import org.apache.kylin.storage.hbase.coprocessor.observer.ObserverAggregators.H
 import org.junit.Before;
 import org.junit.Test;
 
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
+import com.google.common.collect.Lists;
 
 /**
  * @author yangli9
@@ -91,7 +91,7 @@ public class AggregateRegionObserverTest {
     private Cell newCell(byte[] key, HCol col, String decimal, int number) {
         Object[] values = number == Integer.MIN_VALUE ? //
         new Object[] { new BigDecimal(decimal) } //
-                : new Object[] { new BigDecimal(decimal), new LongWritable(number) };
+                : new Object[] { new BigDecimal(decimal), new LongMutable(number) };
         buf.clear();
         col.measureCodec.encode(values, buf);