You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/05/12 13:57:14 UTC

[5/7] carbondata git commit: [CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852

[CARBONDATA-1015] Refactory write step and add ColumnPage in data load This closes #852


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d9ecfb50
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d9ecfb50
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d9ecfb50

Branch: refs/heads/12-dev2
Commit: d9ecfb503dd955e22727f837ee5dffa7af3a90fc
Parents: 663176a
Author: jackylk <ja...@huawei.com>
Authored: Thu May 4 23:32:07 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Fri May 12 21:52:31 2017 +0800

----------------------------------------------------------------------
 .../core/compression/BigIntCompressor.java      |   9 +-
 .../core/compression/DoubleCompressor.java      |  43 +-
 .../core/compression/ValueCompressor.java       |   3 +-
 .../core/constants/CarbonCommonConstants.java   |   6 -
 .../chunk/store/MeasureChunkStoreFactory.java   |  26 +-
 .../BlockIndexerStorageForNoInvertedIndex.java  |   2 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |   3 +-
 .../compression/MeasureMetaDataModel.java       |   8 +-
 .../compression/ReaderCompressModel.java        |   8 +-
 .../compression/ValueCompressionHolder.java     |  26 +-
 .../compression/WriterCompressModel.java        |  41 +-
 .../compression/decimal/CompressByteArray.java  |   4 +-
 .../decimal/CompressionMaxMinByte.java          |   6 +-
 .../decimal/CompressionMaxMinDefault.java       |   6 +-
 .../decimal/CompressionMaxMinInt.java           |   6 +-
 .../decimal/CompressionMaxMinLong.java          |   6 +-
 .../decimal/CompressionMaxMinShort.java         |   6 +-
 .../nondecimal/CompressionNonDecimalByte.java   |   6 +-
 .../CompressionNonDecimalDefault.java           |   6 +-
 .../nondecimal/CompressionNonDecimalInt.java    |   6 +-
 .../nondecimal/CompressionNonDecimalLong.java   |   6 +-
 .../CompressionNonDecimalMaxMinByte.java        |   6 +-
 .../CompressionNonDecimalMaxMinDefault.java     |   6 +-
 .../CompressionNonDecimalMaxMinInt.java         |   6 +-
 .../CompressionNonDecimalMaxMinLong.java        |   6 +-
 .../CompressionNonDecimalMaxMinShort.java       |   6 +-
 .../nondecimal/CompressionNonDecimalShort.java  |   6 +-
 .../compression/none/CompressionNoneByte.java   |   6 +-
 .../none/CompressionNoneDefault.java            |   6 +-
 .../compression/none/CompressionNoneInt.java    |   6 +-
 .../compression/none/CompressionNoneLong.java   |   6 +-
 .../compression/none/CompressionNoneShort.java  |   6 +-
 .../dataholder/CarbonWriteDataHolder.java       | 156 +--
 .../HeavyCompressedDoubleArrayDataStore.java    |  57 --
 .../core/datastore/page/ColumnPage.java         |  42 +
 .../core/datastore/page/ComplexColumnPage.java  |  77 ++
 .../datastore/page/FixLengthColumnPage.java     | 155 +++
 .../datastore/page/VarLengthColumnPage.java     |  42 +
 .../datastore/page/compression/Compression.java |  23 +
 .../datastore/page/encoding/ColumnCodec.java    |  35 +
 .../datastore/page/encoding/DummyCodec.java     |  37 +
 .../page/statistics/PageStatistics.java         | 124 +++
 .../page/statistics/StatisticsCollector.java    |  66 ++
 .../core/metadata/ValueEncoderMeta.java         |  18 +-
 .../core/metadata/datatype/DataType.java        |   6 +-
 .../apache/carbondata/core/util/ByteUtil.java   |   2 +
 .../core/util/CarbonMetadataUtil.java           |  44 +-
 .../carbondata/core/util/CarbonProperties.java  |   1 -
 .../apache/carbondata/core/util/CarbonUtil.java |   2 +-
 .../carbondata/core/util/CompressionFinder.java |  10 +-
 .../carbondata/core/util/DataTypeUtil.java      |   3 -
 .../apache/carbondata/core/util/NodeHolder.java |  31 +-
 .../core/util/ValueCompressionUtil.java         | 190 ++--
 .../core/util/CarbonMetadataUtilTest.java       |  11 +-
 .../core/util/ValueCompressionUtilTest.java     | 114 +--
 .../core/writer/CarbonFooterWriterTest.java     |   2 +-
 .../examples/CarbonSessionExample.scala         |   1 +
 format/src/main/thrift/carbondata.thrift        |   8 +-
 .../testsuite/emptyrow/TestEmptyRows.scala      |   2 +-
 .../dataload/TestLoadDataWithNoMeasure.scala    |   8 +-
 .../ColumnGroupDataTypesTestCase.scala          |   6 +
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   2 +-
 .../merger/CompactionResultSortProcessor.java   |  17 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   2 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |   2 +-
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 133 +--
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |  10 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   |  29 +-
 .../merger/UnsafeIntermediateFileMerger.java    |  38 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  10 +-
 .../sortdata/IntermediateFileMerger.java        |  37 +-
 .../sortandgroupby/sortdata/SortDataRows.java   |  29 +-
 .../sortandgroupby/sortdata/SortParameters.java |  25 +-
 .../sortdata/SortTempFileChunkHolder.java       |  29 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 975 ++++++++-----------
 .../store/CarbonFactDataHandlerModel.java       |  27 +-
 .../store/SingleThreadFinalSortFilesMerger.java |  11 +-
 .../store/colgroup/ColGroupBlockStorage.java    |  29 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   |   4 +-
 .../util/CarbonDataProcessorUtil.java           |  50 +-
 .../store/colgroup/ColGroupMinMaxTest.java      |   2 +
 81 files changed, 1727 insertions(+), 1306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
index 8e43684..8360a68 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
@@ -17,7 +17,7 @@
 package org.apache.carbondata.core.compression;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * It compresses big int data
@@ -63,6 +63,7 @@ public class BigIntCompressor extends ValueCompressor {
    */
   @Override
   protected Object compressAdaptive(DataType convertedDataType, CarbonWriteDataHolder dataHolder) {
+
     long[] value = dataHolder.getWritableLongValues();
     return compressValue(convertedDataType, value, 0, false);
   }
@@ -82,7 +83,7 @@ public class BigIntCompressor extends ValueCompressor {
   protected Object compressValue(DataType convertedDataType, long[] value, long maxValue,
       boolean isMinMax) {
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {
@@ -94,7 +95,7 @@ public class BigIntCompressor extends ValueCompressor {
           }
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {
@@ -106,7 +107,7 @@ public class BigIntCompressor extends ValueCompressor {
           }
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         if (isMinMax) {
           for (int j = 0; j < value.length; j++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
index 840709d..bc2d6f1 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.compression;
 import java.math.BigDecimal;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * Double compressor
@@ -33,7 +33,7 @@ public class DoubleCompressor extends ValueCompressor {
     BigDecimal max = BigDecimal.valueOf((double)maxValue);
     double[] value = dataHolder.getWritableDoubleValues();
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -42,7 +42,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -51,7 +51,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -60,7 +60,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -69,7 +69,7 @@ public class DoubleCompressor extends ValueCompressor {
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           BigDecimal val = BigDecimal.valueOf(value[j]);
@@ -96,35 +96,35 @@ public class DoubleCompressor extends ValueCompressor {
     int i = 0;
     double[] value = dataHolder.getWritableDoubleValues();
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) (Math.round(Math.pow(10, decimal) * value[j]));
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = Math.round(Math.pow(10, decimal) * value[j]);
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) (Math.round(Math.pow(10, decimal) * value[j]));
@@ -148,35 +148,35 @@ public class DoubleCompressor extends ValueCompressor {
     double[] value = dataHolder.getWritableDoubleValues();
     int i = 0;
     switch (convertedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) (maxValue - value[j]);
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) (maxValue - value[j]);
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) (maxValue - value[j]);
           i++;
         }
         return intResult;
-      case DATA_LONG:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = (long) (maxValue - value[j]);
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) (maxValue - value[j]);
@@ -198,36 +198,35 @@ public class DoubleCompressor extends ValueCompressor {
     double[] value = dataHolder.getWritableDoubleValues();
     int i = 0;
     switch (changedDataType) {
-      case DATA_BYTE:
+      case BYTE:
         byte[] result = new byte[value.length];
         for (int j = 0; j < value.length; j++) {
           result[i] = (byte) value[j];
           i++;
         }
         return result;
-      case DATA_SHORT:
+      case SHORT:
         short[] shortResult = new short[value.length];
         for (int j = 0; j < value.length; j++) {
           shortResult[i] = (short) value[j];
           i++;
         }
         return shortResult;
-      case DATA_INT:
+      case INT:
         int[] intResult = new int[value.length];
         for (int j = 0; j < value.length; j++) {
           intResult[i] = (int) value[j];
           i++;
         }
         return intResult;
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         long[] longResult = new long[value.length];
         for (int j = 0; j < value.length; j++) {
           longResult[i] = (long) value[j];
           i++;
         }
         return longResult;
-      case DATA_FLOAT:
+      case FLOAT:
         float[] floatResult = new float[value.length];
         for (int j = 0; j < value.length; j++) {
           floatResult[i] = (float) value[j];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
index 230507f..16f8ac1 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
@@ -17,10 +17,9 @@
 package org.apache.carbondata.core.compression;
 
 import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
-
 /**
  * Measure compressor
  */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 7c59a59..627975a 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -647,12 +647,6 @@ public final class CarbonCommonConstants {
   public static final char BIG_INT_MEASURE = 'd';
 
   /**
-   * This determines the size of array to be processed in data load steps. one
-   * for dimensions , one of ignore dictionary dimensions , one for measures.
-   */
-  public static final int ARRAYSIZE = 3;
-
-  /**
    * CARBON_PREFETCH_BUFFERSIZE
    */
   public static final String CARBON_PREFETCH_BUFFERSIZE = "carbon.prefetch.buffersize";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
index 2a3a2a7..12bfea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/MeasureChunkStoreFactory.java
@@ -30,8 +30,8 @@ import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeDouble
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeIntMeasureChunkStore;
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeLongMeasureChunkStore;
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeShortMeasureChunkStore;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 /**
  * Factory class for getting the measure store type
@@ -67,33 +67,33 @@ public class MeasureChunkStoreFactory {
   public MeasureDataChunkStore getMeasureDataChunkStore(DataType dataType, int numberOfRows) {
     if (!isUnsafe) {
       switch (dataType) {
-        case DATA_BYTE:
+        case BYTE:
           return new SafeByteMeasureChunkStore(numberOfRows);
-        case DATA_SHORT:
+        case SHORT:
           return new SafeShortMeasureChunkStore(numberOfRows);
-        case DATA_INT:
+        case INT:
           return new SafeIntMeasureChunkStore(numberOfRows);
-        case DATA_LONG:
+        case LONG:
           return new SafeLongMeasureChunkStore(numberOfRows);
-        case DATA_BIGDECIMAL:
+        case DECIMAL:
           return new SafeBigDecimalMeasureChunkStore(numberOfRows);
-        case DATA_DOUBLE:
+        case DOUBLE:
         default:
           return new SafeDoubleMeasureChunkStore(numberOfRows);
       }
     } else {
       switch (dataType) {
-        case DATA_BYTE:
+        case BYTE:
           return new UnsafeByteMeasureChunkStore(numberOfRows);
-        case DATA_SHORT:
+        case SHORT:
           return new UnsafeShortMeasureChunkStore(numberOfRows);
-        case DATA_INT:
+        case INT:
           return new UnsafeIntMeasureChunkStore(numberOfRows);
-        case DATA_LONG:
+        case LONG:
           return new UnsafeLongMeasureChunkStore(numberOfRows);
-        case DATA_BIGDECIMAL:
+        case DECIMAL:
           return new UnsafeBigDecimalMeasureChunkStore(numberOfRows);
-        case DATA_DOUBLE:
+        case DOUBLE:
         default:
           return new UnsafeDoubleMeasureChunkStore(numberOfRows);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
index 0ef2518..f36ee97 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndex.java
@@ -37,7 +37,7 @@ public class BlockIndexerStorageForNoInvertedIndex implements IndexStorage<int[]
   private byte[] min;
   private byte[] max;
 
-  public BlockIndexerStorageForNoInvertedIndex(byte[][] keyBlockInput, boolean isNoDictionary) {
+  public BlockIndexerStorageForNoInvertedIndex(byte[][] keyBlockInput) {
     this.keyBlock = keyBlockInput;
     min = keyBlock[0];
     max = keyBlock[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 731df96..901084f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -36,8 +36,7 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   private byte[] min;
   private byte[] max;
 
-  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput,
-      boolean isNoDictionary) {
+  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput) {
     this.keyBlock = keyBlockInput;
     min = keyBlock[0];
     max = keyBlock[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
index 5d35b0d..7a39f7c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
 public class MeasureMetaDataModel {
   /**
    * maxValue
@@ -46,7 +48,7 @@ public class MeasureMetaDataModel {
   /**
    * type
    */
-  private char[] type;
+  private DataType[] type;
 
   /**
    * dataTypeSelected
@@ -54,7 +56,7 @@ public class MeasureMetaDataModel {
   private byte[] dataTypeSelected;
 
   public MeasureMetaDataModel(Object[] minValue, Object[] maxValue, int[] mantissa,
-      int measureCount, Object[] uniqueValue, char[] type, byte[] dataTypeSelected) {
+      int measureCount, Object[] uniqueValue, DataType[] type, byte[] dataTypeSelected) {
     this.minValue = minValue;
     this.maxValue = maxValue;
     this.mantissa = mantissa;
@@ -112,7 +114,7 @@ public class MeasureMetaDataModel {
   /**
    * @return the type
    */
-  public char[] getType() {
+  public DataType[] getType() {
     return type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
index 101a24b..60687d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ReaderCompressModel.java
@@ -18,13 +18,13 @@
 package org.apache.carbondata.core.datastore.compression;
 
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 // Used in read path for decompression preparation
 public class ReaderCompressModel {
   private ValueEncoderMeta valueEncoderMeta;
 
-  private ValueCompressionUtil.DataType convertedDataType;
+  private DataType convertedDataType;
 
   private ValueCompressionHolder valueHolder;
 
@@ -32,11 +32,11 @@ public class ReaderCompressModel {
     this.valueEncoderMeta = valueEncoderMeta;
   }
 
-  public ValueCompressionUtil.DataType getConvertedDataType() {
+  public DataType getConvertedDataType() {
     return convertedDataType;
   }
 
-  public void setConvertedDataType(ValueCompressionUtil.DataType convertedDataType) {
+  public void setConvertedDataType(DataType convertedDataType) {
     this.convertedDataType = convertedDataType;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
index 9b3a18a..614eb32 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/ValueCompressionHolder.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.datastore.compression;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
  * ValueCompressionHolder is the base class for handling
@@ -40,24 +40,23 @@ public abstract class ValueCompressionHolder<T> {
   protected void unCompress(Compressor compressor, DataType dataType, byte[] data, int offset,
       int length, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     switch (dataType) {
-      case DATA_BYTE:
+      case BYTE:
         setValue((T) compressor.unCompressByte(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_SHORT:
+      case SHORT:
         setValue((T) compressor.unCompressShort(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_INT:
+      case INT:
         setValue((T) compressor.unCompressInt(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         setValue((T) compressor.unCompressLong(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
-      case DATA_FLOAT:
+      case FLOAT:
         setValue((T) compressor.unCompressFloat(data, offset, length), numberOfRows, maxValueObject,
             decimalPlaces);
         break;
@@ -75,18 +74,17 @@ public abstract class ValueCompressionHolder<T> {
    */
   public byte[] compress(Compressor compressor, DataType dataType, Object data) {
     switch (dataType) {
-      case DATA_BYTE:
+      case BYTE:
         return compressor.compressByte((byte[]) data);
-      case DATA_SHORT:
+      case SHORT:
         return compressor.compressShort((short[]) data);
-      case DATA_INT:
+      case INT:
         return compressor.compressInt((int[]) data);
-      case DATA_LONG:
-      case DATA_BIGINT:
+      case LONG:
         return compressor.compressLong((long[]) data);
-      case DATA_FLOAT:
+      case FLOAT:
         return compressor.compressFloat((float[]) data);
-      case DATA_DOUBLE:
+      case DOUBLE:
       default:
         return compressor.compressDouble((double[]) data);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
index 0430c8c..a2bf47a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
@@ -17,19 +17,24 @@
 
 package org.apache.carbondata.core.datastore.compression;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
 
+import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
+import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
+
 public class WriterCompressModel {
 
   /**
    * DataType[]  variable.
    */
-  private ValueCompressionUtil.DataType[] convertedDataType;
+  private DataType[] convertedDataType;
   /**
    * DataType[]  variable.
    */
-  private ValueCompressionUtil.DataType[] actualDataType;
+  private DataType[] actualDataType;
 
   /**
    * maxValue
@@ -52,7 +57,7 @@ public class WriterCompressModel {
   /**
    * aggType
    */
-  private char[] type;
+  private DataType[] type;
 
   /**
    * dataTypeSelected
@@ -68,28 +73,28 @@ public class WriterCompressModel {
   /**
    * @return the convertedDataType
    */
-  public ValueCompressionUtil.DataType[] getConvertedDataType() {
+  public DataType[] getConvertedDataType() {
     return convertedDataType;
   }
 
   /**
    * @param convertedDataType the convertedDataType to set
    */
-  public void setConvertedDataType(ValueCompressionUtil.DataType[] convertedDataType) {
+  public void setConvertedDataType(DataType[] convertedDataType) {
     this.convertedDataType = convertedDataType;
   }
 
   /**
    * @return the actualDataType
    */
-  public ValueCompressionUtil.DataType[] getActualDataType() {
+  public DataType[] getActualDataType() {
     return actualDataType;
   }
 
   /**
    * @param actualDataType
    */
-  public void setActualDataType(ValueCompressionUtil.DataType[] actualDataType) {
+  public void setActualDataType(DataType[] actualDataType) {
     this.actualDataType = actualDataType;
   }
 
@@ -159,13 +164,33 @@ public class WriterCompressModel {
    * @return the aggType
    */
   public char[] getType() {
+    char[] ret = new char[type.length];
+    for (int i = 0; i < ret.length; i++) {
+      switch (type[i]) {
+        case SHORT:
+        case INT:
+        case LONG:
+          ret[i] = CarbonCommonConstants.BIG_INT_MEASURE;
+          break;
+        case DOUBLE:
+          ret[i] = CarbonCommonConstants.DOUBLE_MEASURE;
+          break;
+        case DECIMAL:
+          ret[i] = CarbonCommonConstants.BIG_DECIMAL_MEASURE;
+          break;
+      }
+    }
+    return ret;
+  }
+
+  public DataType[] getDataType() {
     return type;
   }
 
   /**
    * @param type the type to set
    */
-  public void setType(char[] type) {
+  public void setType(DataType[] type) {
     this.type = type;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
index e517e41..7a36e66 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressByteArray.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressByteArray extends ValueCompressionHolder<byte[]> {
 
@@ -101,7 +101,7 @@ public class CompressByteArray extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BIGDECIMAL, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DECIMAL, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
index ed50cd3..45e3b47 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
 
@@ -68,7 +68,7 @@ public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -103,7 +103,7 @@ public class CompressionMaxMinByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
index 6550c46..6bd1947 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
 
@@ -70,7 +70,7 @@ public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -106,7 +106,7 @@ public class CompressionMaxMinDefault extends ValueCompressionHolder<double[]> {
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
index 512cf2c..60ddfea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   /**
@@ -72,7 +72,7 @@ public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -102,7 +102,7 @@ public class CompressionMaxMinInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
index ca91d44..159e741 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   /**
@@ -57,7 +57,7 @@ public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void setValue(long[] value) {
@@ -102,7 +102,7 @@ public class CompressionMaxMinLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
index d44875a..1d36375 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/decimal/CompressionMaxMinShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
 
@@ -74,7 +74,7 @@ public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -104,7 +104,7 @@ public class CompressionMaxMinShort extends ValueCompressionHolder<short[]> {
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     if (maxValueObject instanceof Long) {
       this.maxValue = (long) maxValueObject;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
index 13f962f..1c46dea 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -92,7 +92,7 @@ public class CompressionNonDecimalByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
index 15c7bb3..40312db 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -56,7 +56,7 @@ public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValue(double[] value) {
@@ -94,7 +94,7 @@ public class CompressionNonDecimalDefault extends ValueCompressionHolder<double[
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
index 7e3606e..ada751e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] bytesArr) {
@@ -93,7 +93,7 @@ public class CompressionNonDecimalInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
index d810972..f0b2323 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   /**
@@ -60,7 +60,7 @@ public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -95,7 +95,7 @@ public class CompressionNonDecimalLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
index 83e8a1a..c425706 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNonDecimalMaxMinByte extends ValueCompressionHolder<byte
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
index c408d9a..521328e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<d
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -99,7 +99,7 @@ public class CompressionNonDecimalMaxMinDefault extends ValueCompressionHolder<d
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
index 40b1099..efd21d5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]> {
   /**
@@ -60,7 +60,7 @@ public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNonDecimalMaxMinInt extends ValueCompressionHolder<int[]
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
index f8d8ed5..d2e2771 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long[]> {
   /**
@@ -68,7 +68,7 @@ public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNonDecimalMaxMinLong extends ValueCompressionHolder<long
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
index 14648ba..241344a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalMaxMinShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<short[]> {
   /**
@@ -66,7 +66,7 @@ public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<sho
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -96,7 +96,7 @@ public class CompressionNonDecimalMaxMinShort extends ValueCompressionHolder<sho
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.maxValue = BigDecimal.valueOf((double) maxValueObject);
     this.divisionFactor = Math.pow(10, decimalPlaces);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
index 8536630..4737e10 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/nondecimal/CompressionNonDecimalShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]> {
   /**
@@ -58,7 +58,7 @@ public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]>
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, value);
+    compressedValue = super.compress(compressor, DataType.SHORT, value);
   }
 
   @Override public void uncompress(DataType dataType, byte[] compressedData, int offset, int length,
@@ -93,7 +93,7 @@ public class CompressionNonDecimalShort extends ValueCompressionHolder<short[]>
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
     this.divisionFactory = Math.pow(10, decimalPlaces);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
index acd73d9..13af929 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneByte.java
@@ -26,7 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 
 public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   /**
@@ -72,7 +72,7 @@ public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_BYTE, value);
+    compressedValue = super.compress(compressor, DataType.BYTE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNoneByte extends ValueCompressionHolder<byte[]> {
   @Override
   public void setValue(byte[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_BYTE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.BYTE, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
index 8e02fd8..cca7927 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneDefault.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   /**
@@ -69,7 +69,7 @@ public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_DOUBLE, value);
+    compressedValue = super.compress(compressor, DataType.DOUBLE, value);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -97,7 +97,7 @@ public class CompressionNoneDefault extends ValueCompressionHolder<double[]> {
   @Override
   public void setValue(double[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_DOUBLE, numberOfRows);
+        .getMeasureDataChunkStore(DataType.DOUBLE, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
index f0c0311..f5bb813 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneInt.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_INT, value);
+    compressedValue = super.compress(compressor, DataType.INT, value);
   }
 
   @Override
@@ -96,7 +96,7 @@ public class CompressionNoneInt extends ValueCompressionHolder<int[]> {
   @Override
   public void setValue(int[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore =
-        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.DATA_INT, numberOfRows);
+        MeasureChunkStoreFactory.INSTANCE.getMeasureDataChunkStore(DataType.INT, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
index ee72c8a..99f24ac 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneLong.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   /**
@@ -62,7 +62,7 @@ public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_LONG, value);
+    compressedValue = super.compress(compressor, DataType.LONG, value);
   }
 
   @Override
@@ -96,7 +96,7 @@ public class CompressionNoneLong extends ValueCompressionHolder<long[]> {
   @Override
   public void setValue(long[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_LONG, numberOfRows);
+        .getMeasureDataChunkStore(DataType.LONG, numberOfRows);
     this.measureChunkStore.putData(data);
 
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
index d4289ab..664f9e4 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/none/CompressionNoneShort.java
@@ -27,8 +27,8 @@ import org.apache.carbondata.core.datastore.chunk.store.MeasureDataChunkStore;
 import org.apache.carbondata.core.datastore.compression.Compressor;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.core.util.ValueCompressionUtil.DataType;
 
 public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   /**
@@ -71,7 +71,7 @@ public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   }
 
   @Override public void compress() {
-    compressedValue = super.compress(compressor, DataType.DATA_SHORT, shortValue);
+    compressedValue = super.compress(compressor, DataType.SHORT, shortValue);
   }
 
   @Override public void setValueInBytes(byte[] value) {
@@ -98,7 +98,7 @@ public class CompressionNoneShort extends ValueCompressionHolder<short[]> {
   @Override
   public void setValue(short[] data, int numberOfRows, Object maxValueObject, int decimalPlaces) {
     this.measureChunkStore = MeasureChunkStoreFactory.INSTANCE
-        .getMeasureDataChunkStore(DataType.DATA_SHORT, numberOfRows);
+        .getMeasureDataChunkStore(DataType.SHORT, numberOfRows);
     this.measureChunkStore.putData(data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
index fb21d95..8d3cc0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
@@ -34,137 +34,57 @@ public class CarbonWriteDataHolder {
   private byte[][] byteValues;
 
   /**
-   * byteValues for no dictionary.
-   */
-  private byte[][][] byteValuesForNonDictionary;
-
-  /**
-   * byteValues
-   */
-  private byte[][][] columnByteValues;
-
-  /**
    * size
    */
   private int size;
 
   /**
-   * totalSize
+   * total size of the data in bytes added
    */
   private int totalSize;
 
-  /**
-   * Method to initialise double array
-   *
-   * @param size
-   */
-  public void initialiseDoubleValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-    doubleValues = new double[size];
-  }
-
   public void reset() {
     size = 0;
     totalSize = 0;
   }
 
   /**
-   * Method to initialise double array
-   *
-   * @param size
-   */
-  public void initialiseByteArrayValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-
-    byteValues = new byte[size][];
-    columnByteValues = new byte[size][][];
-  }
-
-  /**
-   * Method to initialise byte array
-   *
-   * @param size
+   * set long data type columnar data
+   * @param values
    */
-  public void initialiseByteArrayValuesForKey(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
-    }
-
-    byteValues = new byte[size][];
-  }
-
-  public void initialiseByteArrayValuesForNonDictionary(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
+  public void setWritableLongPage(long[] values) {
+    if (values != null) {
+      longValues = values;
+      size += values.length;
+      totalSize += values.length;
     }
-
-    byteValuesForNonDictionary = new byte[size][][];
   }
 
   /**
-   * Method to initialise long array
-   *
-   * @param size
+   * set double data type columnar data
+   * @param values
    */
-  public void initialiseLongValues(int size) {
-    if (size < 1) {
-      throw new IllegalArgumentException("Invalid array size");
+  public void setWritableDoublePage(double[] values) {
+    if (values != null) {
+      doubleValues = values;
+      size += values.length;
+      totalSize += values.length;
     }
-    longValues = new long[size];
-  }
-
-  /**
-   * set double value by index
-   *
-   * @param index
-   * @param value
-   */
-  public void setWritableDoubleValueByIndex(int index, Object value) {
-    doubleValues[index] = (Double) value;
-    size++;
   }
 
   /**
-   * set double value by index
-   *
-   * @param index
-   * @param value
-   */
-  public void setWritableLongValueByIndex(int index, Object value) {
-    longValues[index] = (Long) value;
-    size++;
-  }
-
-  /**
-   * set byte array value by index
-   *
-   * @param index
-   * @param value
+   * set decimal data type columnar data
+   * @param values
    */
-  public void setWritableByteArrayValueByIndex(int index, byte[] value) {
-    byteValues[index] = value;
-    size++;
-    if (null != value) totalSize += value.length;
-  }
-
-  public void setWritableNonDictByteArrayValueByIndex(int index, byte[][] value) {
-    byteValuesForNonDictionary[index] = value;
-    size++;
-    if (null != value) totalSize += value.length;
-  }
-
-  /**
-   * set byte array value by index
-   */
-  public void setWritableByteArrayValueByIndex(int index, int mdKeyIndex, Object[] columnData) {
-    int l = 0;
-    columnByteValues[index] = new byte[columnData.length - (mdKeyIndex + 1)][];
-    for (int i = mdKeyIndex + 1; i < columnData.length; i++) {
-      columnByteValues[index][l++] = (byte[]) columnData[i];
+  public void setWritableDecimalPage(byte[][] values) {
+    if (values != null) {
+      byteValues = values;
+      size += values.length;
+      for (int i = 0; i < values.length; i++) {
+        if (values[i] != null) {
+          totalSize += values[i].length;
+        }
+      }
     }
   }
 
@@ -187,30 +107,14 @@ public class CarbonWriteDataHolder {
     byte[] temp = new byte[totalSize];
     int startIndexToCopy = 0;
     for (int i = 0; i < size; i++) {
-      System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
-      startIndexToCopy += byteValues[i].length;
+      if (byteValues[i] != null) {
+        System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
+        startIndexToCopy += byteValues[i].length;
+      }
     }
     return temp;
   }
 
-  public byte[][] getByteArrayValues() {
-    if (size < byteValues.length) {
-      byte[][] temp = new byte[size][];
-      System.arraycopy(byteValues, 0, temp, 0, size);
-      byteValues = temp;
-    }
-    return byteValues;
-  }
-
-  public byte[][][] getNonDictByteArrayValues() {
-    if (size < byteValuesForNonDictionary.length) {
-      byte[][][] temp = new byte[size][][];
-      System.arraycopy(byteValuesForNonDictionary, 0, temp, 0, size);
-      byteValuesForNonDictionary = temp;
-    }
-    return byteValuesForNonDictionary;
-  }
-
   /**
    * Get Writable Double Values
    *

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
deleted file mode 100644
index d3d67fd..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/impl/data/compressed/HeavyCompressedDoubleArrayDataStore.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.impl.data.compressed;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-public class HeavyCompressedDoubleArrayDataStore {
-
-  // this method first invokes encoding routine to encode the data chunk,
-  // followed by invoking compression routine for preparing the data chunk for writing.
-  public static byte[][] encodeMeasureDataArray(
-      WriterCompressModel compressionModel,
-      CarbonWriteDataHolder[] dataHolder) {
-    char[] type = compressionModel.getType();
-    ValueCompressionHolder[] values =
-        new ValueCompressionHolder[compressionModel.getValueCompressionHolder().length];
-    byte[][] returnValue = new byte[values.length][];
-    for (int i = 0; i < compressionModel.getValueCompressionHolder().length; i++) {
-      values[i] = compressionModel.getValueCompressionHolder()[i];
-      if (type[i] != CarbonCommonConstants.BYTE_VALUE_MEASURE
-          && type[i] != CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-        // first perform encoding of the data chunk
-        values[i].setValue(
-            ValueCompressionUtil.getValueCompressor(compressionModel.getCompressionFinders()[i])
-                .getCompressedValues(compressionModel.getCompressionFinders()[i], dataHolder[i],
-                    compressionModel.getMaxValue()[i],
-                    compressionModel.getMantissa()[i]));
-      } else {
-        values[i].setValue(dataHolder[i].getWritableByteArrayValues());
-      }
-      values[i].compress();
-      returnValue[i] = values[i].getCompressedData();
-    }
-
-    return returnValue;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d9ecfb50/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
new file mode 100644
index 0000000..25a813c
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page;
+
+import org.apache.carbondata.core.datastore.page.statistics.PageStatistics;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class ColumnPage {
+
+  protected final DataType dataType;
+  protected final int pageSize;
+  protected PageStatistics stats;
+
+  protected ColumnPage(DataType dataType, int pageSize) {
+    this.dataType = dataType;
+    this.pageSize = pageSize;
+    this.stats = new PageStatistics(dataType);
+  }
+
+  protected void updateStatistics(Object value) {
+    stats.update(value);
+  }
+
+  public PageStatistics getStatistics() {
+    return stats;
+  }
+}