You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/10 03:46:41 UTC

[5/6] carbondata git commit: extract interface

extract interface


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dc83b2ac
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dc83b2ac
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dc83b2ac

Branch: refs/heads/master
Commit: dc83b2acc3c4d3dc0c46cb6c118e8d1c5aa21821
Parents: f53ab4b
Author: jackylk <ja...@huawei.com>
Authored: Thu Jun 8 12:03:53 2017 +0800
Committer: jackylk <ja...@huawei.com>
Committed: Sat Jun 10 08:02:40 2017 +0800

----------------------------------------------------------------------
 .../core/compression/BigIntCompressor.java      |  34 +-
 .../core/compression/DoubleCompressor.java      |  22 +-
 .../core/compression/ValueCompressor.java       |  75 +---
 .../core/datastore/DimensionType.java           |  35 ++
 .../carbondata/core/datastore/TableSpec.java    | 220 ++++++++++
 .../core/datastore/block/SegmentProperties.java |   6 +-
 .../AbstractMeasureChunkReaderV2V3Format.java   |   2 +
 ...CompressedMeasureChunkFileBasedReaderV2.java |  29 +-
 ...CompressedMeasureChunkFileBasedReaderV3.java |  33 +-
 ...ndexerStorageForNoInvertedIndexForShort.java |   4 +-
 .../compression/MeasureMetaDataModel.java       |   2 +-
 .../compression/WriterCompressModel.java        | 243 -----------
 .../dataholder/CarbonWriteDataHolder.java       | 131 ------
 .../exception/CarbonDataWriterException.java    |  79 ++++
 .../core/datastore/page/ColumnPage.java         | 196 ++++++++-
 .../core/datastore/page/ComplexColumnPage.java  |   8 +-
 .../datastore/page/FixLengthColumnPage.java     | 159 -------
 .../datastore/page/VarLengthColumnPage.java     |  42 --
 .../page/statistics/ColumnPageStatistics.java   | 172 ++++++++
 .../page/statistics/MeasurePageStatsVO.java     | 103 +++++
 .../page/statistics/PageStatistics.java         | 132 ------
 .../core/datastore/row/CarbonRow.java           |  73 ++++
 .../core/datastore/row/WriteStepRowUtil.java    |  86 ++++
 .../core/metadata/BlockletInfoColumnar.java     |  20 +-
 .../core/metadata/CarbonMetadata.java           |   6 +-
 .../core/metadata/datatype/DataType.java        |   5 +-
 .../schema/table/column/CarbonDimension.java    |  23 +-
 .../table/column/CarbonImplicitDimension.java   |   9 +-
 .../schema/table/column/ColumnSchema.java       |   4 +-
 .../executor/impl/AbstractQueryExecutor.java    |   4 +-
 .../core/scan/executor/util/QueryUtil.java      |  22 +-
 .../scan/executor/util/RestructureUtil.java     |   4 +-
 .../util/AbstractDataFileFooterConverter.java   |   2 +-
 .../apache/carbondata/core/util/ByteUtil.java   |  18 +
 .../core/util/CarbonMetadataUtil.java           | 113 ++---
 .../apache/carbondata/core/util/CarbonUtil.java |  43 +-
 .../apache/carbondata/core/util/NodeHolder.java |  61 ++-
 .../core/util/ValueCompressionUtil.java         |  56 +--
 ...ressedDimensionChunkFileBasedReaderTest.java | 132 ------
 ...mpressedMeasureChunkFileBasedReaderTest.java |  90 ----
 .../core/metadata/CarbonMetadataTest.java       |   2 +-
 .../core/util/CarbonMetadataUtilTest.java       |  29 +-
 .../carbondata/core/util/CarbonUtilTest.java    |   5 +-
 .../core/util/ValueCompressionUtilTest.java     | 134 ------
 .../core/writer/CarbonFooterWriterTest.java     |  30 +-
 docs/useful-tips-on-carbondata.md               |   3 +-
 .../util/ExternalColumnDictionaryTestCase.scala |  11 +-
 .../vectorreader/AddColumnTestCases.scala       |   1 +
 .../core/datastore/GenericDataType.java         | 145 +++++++
 .../columnar/ColGroupBlockStorage.java          | 103 +++++
 .../processing/datatypes/ArrayDataType.java     |   1 +
 .../processing/datatypes/GenericDataType.java   | 145 -------
 .../processing/datatypes/PrimitiveDataType.java |   1 +
 .../processing/datatypes/StructDataType.java    |   1 +
 .../merger/CompactionResultSortProcessor.java   |   4 +-
 .../merger/RowResultMergerProcessor.java        |   6 +-
 .../newflow/AbstractDataLoadProcessorStep.java  |   2 +-
 .../newflow/CarbonDataLoadConfiguration.java    |  12 +
 .../newflow/DataLoadProcessBuilder.java         |   3 +
 .../newflow/converter/FieldConverter.java       |   2 +-
 .../newflow/converter/RowConverter.java         |   2 +-
 .../impl/ComplexFieldConverterImpl.java         |   4 +-
 .../impl/DictionaryFieldConverterImpl.java      |   6 +-
 .../DirectDictionaryFieldConverterImpl.java     |  10 +-
 .../converter/impl/FieldEncoderFactory.java     |   2 +-
 .../impl/MeasureFieldConverterImpl.java         |   4 +-
 .../impl/NonDictionaryFieldConverterImpl.java   |   7 +-
 .../converter/impl/RowConverterImpl.java        |   2 +-
 .../processing/newflow/row/CarbonRow.java       |  73 ----
 .../processing/newflow/row/CarbonRowBatch.java  |   1 +
 .../processing/newflow/row/CarbonSortBatch.java |   1 +
 .../newflow/row/WriteStepRowUtil.java           |  86 ----
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   8 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  10 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   4 +-
 .../impl/UnsafeParallelReadMergeSorterImpl.java |  10 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  11 +-
 .../newflow/sort/unsafe/IntPointerBuffer.java   |   1 -
 .../newflow/sort/unsafe/UnsafeSortDataRows.java |   2 +-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |   2 +-
 .../CarbonRowDataWriterProcessorStepImpl.java   |   6 +-
 .../steps/DataConverterProcessorStepImpl.java   |   2 +-
 ...ConverterProcessorWithBucketingStepImpl.java |   2 +-
 .../steps/DataWriterBatchProcessorStepImpl.java |   2 +-
 .../steps/DataWriterProcessorStepImpl.java      |   4 +-
 .../newflow/steps/DummyClassForTest.java        |   2 +-
 .../newflow/steps/InputProcessorStepImpl.java   |   2 +-
 .../newflow/steps/SortProcessorStepImpl.java    |   2 +-
 .../sortandgroupby/sortdata/RowComparator.java  |   2 +-
 .../store/CarbonFactDataHandlerColumnar.java    | 429 ++-----------------
 .../store/CarbonFactDataHandlerModel.java       |  32 +-
 .../processing/store/CarbonFactHandler.java     |   4 +-
 .../processing/store/DefaultEncoder.java        | 265 ++++++++++++
 .../store/SingleThreadFinalSortFilesMerger.java |   2 +-
 .../carbondata/processing/store/TablePage.java  |  58 ++-
 .../processing/store/TablePageKey.java          | 139 ++++++
 .../processing/store/TablePageStatistics.java   | 142 ++++++
 .../store/colgroup/ColGroupBlockStorage.java    | 101 -----
 .../store/writer/AbstractFactDataWriter.java    |  57 +--
 .../store/writer/CarbonDataWriterVo.java        |  30 +-
 .../store/writer/CarbonFactDataWriter.java      |  26 +-
 .../processing/store/writer/Encoder.java        |  38 ++
 .../exception/CarbonDataWriterException.java    |  79 ----
 .../writer/v1/CarbonFactDataWriterImplV1.java   |  67 ++-
 .../writer/v2/CarbonFactDataWriterImplV2.java   |  18 +-
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 135 +++---
 .../util/CarbonDataProcessorUtil.java           |   2 +-
 .../processing/util/NonDictionaryUtil.java      |   2 +-
 108 files changed, 2385 insertions(+), 2649 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
index 8360a68..2db707f 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/BigIntCompressor.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.compression;
 
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -24,47 +24,39 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
  */
 public class BigIntCompressor extends ValueCompressor {
 
-  @Override protected Object compressNonDecimalMaxMin(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, int decimal, Object max) {
+  @Override
+  protected Object compressNonDecimalMaxMin(DataType convertedDataType,
+      ColumnPage columnPage, int decimal, Object max) {
     // in case if bigint, decimal will be 0
-    return compressMaxMin(convertedDataType, dataHolder, max);
+    return compressMaxMin(convertedDataType, columnPage, max);
   }
 
   @Override
-  protected Object compressNonDecimal(DataType convertedDataType, CarbonWriteDataHolder dataHolder,
+  protected Object compressNonDecimal(DataType convertedDataType, ColumnPage columnPage,
       int decimal) {
     // in case if bigint, decimal will be 0
-    return compressAdaptive(convertedDataType, dataHolder);
+    return compressAdaptive(convertedDataType, columnPage);
   }
 
   /**
    * 1. It gets delta value i.e difference of maximum value and actual value
    * 2. Convert the delta value computed above to convertedDatatype if it can
    *    be stored with less byte
-   * @param convertedDataType
-   * @param dataHolder
-   * @param max
-   * @return
    */
   @Override
-  protected Object compressMaxMin(DataType convertedDataType, CarbonWriteDataHolder dataHolder,
+  protected Object compressMaxMin(DataType convertedDataType, ColumnPage columnPage,
       Object max) {
     long maxValue = (long) max;
-    long[] value = dataHolder.getWritableLongValues();
+    long[] value = columnPage.getLongPage();
     return compressValue(convertedDataType, value, maxValue, true);
   }
 
   /**
-   * 1. It converts actual value to converted data type if it can be
-   *   stored with less bytes.
-   * @param convertedDataType
-   * @param dataHolder
-   * @return
+   * It converts actual value to converted data type if it can be stored with less bytes.
    */
   @Override
-  protected Object compressAdaptive(DataType convertedDataType, CarbonWriteDataHolder dataHolder) {
-
-    long[] value = dataHolder.getWritableLongValues();
+  protected Object compressAdaptive(DataType convertedDataType, ColumnPage columnPage) {
+    long[] value = columnPage.getLongPage();
     return compressValue(convertedDataType, value, 0, false);
   }
 
@@ -80,7 +72,7 @@ public class BigIntCompressor extends ValueCompressor {
    * @param isMinMax
    * @return
    */
-  protected Object compressValue(DataType convertedDataType, long[] value, long maxValue,
+  private Object compressValue(DataType convertedDataType, long[] value, long maxValue,
       boolean isMinMax) {
     switch (convertedDataType) {
       case BYTE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
index bc2d6f1..5a24479 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/DoubleCompressor.java
@@ -18,7 +18,7 @@ package org.apache.carbondata.core.compression;
 
 import java.math.BigDecimal;
 
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 
 /**
@@ -26,12 +26,12 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
  */
 public class DoubleCompressor extends ValueCompressor {
 
-
-  @Override protected Object compressNonDecimalMaxMin(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, int decimal, Object maxValue) {
+  @Override
+  protected Object compressNonDecimalMaxMin(DataType convertedDataType,
+      ColumnPage columnPage, int decimal, Object maxValue) {
     int i = 0;
     BigDecimal max = BigDecimal.valueOf((double)maxValue);
-    double[] value = dataHolder.getWritableDoubleValues();
+    double[] value = columnPage.getDoublePage();
     switch (convertedDataType) {
       case BYTE:
         byte[] result = new byte[value.length];
@@ -91,10 +91,10 @@ public class DoubleCompressor extends ValueCompressor {
   }
 
   @Override
-  protected Object compressNonDecimal(DataType convertedDataType, CarbonWriteDataHolder dataHolder,
+  protected Object compressNonDecimal(DataType convertedDataType, ColumnPage columnPage,
       int decimal) {
     int i = 0;
-    double[] value = dataHolder.getWritableDoubleValues();
+    double[] value = columnPage.getDoublePage();
     switch (convertedDataType) {
       case BYTE:
         byte[] result = new byte[value.length];
@@ -142,10 +142,10 @@ public class DoubleCompressor extends ValueCompressor {
   }
 
   @Override
-  protected Object compressMaxMin(DataType convertedDataType, CarbonWriteDataHolder dataHolder,
+  protected Object compressMaxMin(DataType convertedDataType, ColumnPage columnPage,
       Object max) {
     double maxValue = (double) max;
-    double[] value = dataHolder.getWritableDoubleValues();
+    double[] value = columnPage.getDoublePage();
     int i = 0;
     switch (convertedDataType) {
       case BYTE:
@@ -194,8 +194,8 @@ public class DoubleCompressor extends ValueCompressor {
   }
 
   @Override
-  protected Object compressAdaptive(DataType changedDataType, CarbonWriteDataHolder dataHolder) {
-    double[] value = dataHolder.getWritableDoubleValues();
+  protected Object compressAdaptive(DataType changedDataType, ColumnPage columnPage) {
+    double[] value = columnPage.getDoublePage();
     int i = 0;
     switch (changedDataType) {
       case BYTE:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
index 16f8ac1..b1a5b70 100644
--- a/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
+++ b/core/src/main/java/org/apache/carbondata/core/compression/ValueCompressor.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.core.compression;
 
-import org.apache.carbondata.core.datastore.dataholder.CarbonWriteDataHolder;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CompressionFinder;
 import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
@@ -26,73 +26,30 @@ import org.apache.carbondata.core.util.ValueCompressionUtil.COMPRESSION_TYPE;
 public abstract class ValueCompressor {
 
   public Object getCompressedValues(CompressionFinder compressionFinder,
-      CarbonWriteDataHolder dataHolder, Object maxValue, int decimal) {
-    return getCompressedValues(compressionFinder.getCompType(),
-        dataHolder,
-        compressionFinder.getConvertedDataType(),
-        maxValue, decimal);
-  }
-
-  /**
-   *
-   * @param compType
-   * @param dataHolder
-   * @param convertedDataType
-   * @param maxValue
-   * @param decimal
-   * @return compressed data
-   */
-  public Object getCompressedValues(COMPRESSION_TYPE compType, CarbonWriteDataHolder dataHolder,
-      DataType convertedDataType, Object maxValue, int decimal) {
+      ColumnPage columnPage, Object maxValue, int decimal) {
+    COMPRESSION_TYPE compType = compressionFinder.getCompType();
+    DataType convertedDataType = compressionFinder.getConvertedDataType();
     switch (compType) {
       case ADAPTIVE:
-        return compressAdaptive(convertedDataType, dataHolder);
+        return compressAdaptive(convertedDataType, columnPage);
       case DELTA_DOUBLE:
-        return compressMaxMin(convertedDataType, dataHolder, maxValue);
+        return compressMaxMin(convertedDataType, columnPage, maxValue);
       case BIGINT:
-        return compressNonDecimal(convertedDataType, dataHolder, decimal);
+        return compressNonDecimal(convertedDataType, columnPage, decimal);
       default:
-        return compressNonDecimalMaxMin(convertedDataType, dataHolder, decimal, maxValue);
+        return compressNonDecimalMaxMin(convertedDataType, columnPage, decimal, maxValue);
     }
   }
 
-  /**
-   *
-   * @param convertedDataType
-   * @param dataHolder
-   * @param decimal
-   * @param maxValue
-   * @return compressed data
-   */
-  protected abstract Object compressNonDecimalMaxMin(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, int decimal, Object maxValue);
+  abstract Object compressNonDecimalMaxMin(DataType convertedDataType,
+      ColumnPage columnPage, int decimal, Object maxValue);
 
-  /**
-   *
-   * @param convertedDataType
-   * @param dataHolder
-   * @param decimal
-   * @return compressed data
-   */
-  protected abstract Object compressNonDecimal(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, int decimal);
+  abstract Object compressNonDecimal(DataType convertedDataType,
+      ColumnPage columnPage, int decimal);
 
-  /**
-   *
-   * @param convertedDataType
-   * @param dataHolder
-   * @param maxValue
-   * @return compressed data
-   */
-  protected abstract Object compressMaxMin(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder, Object maxValue);
+  abstract Object compressMaxMin(DataType convertedDataType,
+      ColumnPage columnPage, Object maxValue);
 
-  /**
-   *
-   * @param convertedDataType
-   * @param dataHolder
-   * @return compressed data
-   */
-  protected abstract Object compressAdaptive(DataType convertedDataType,
-      CarbonWriteDataHolder dataHolder);
+  abstract Object compressAdaptive(DataType convertedDataType,
+      ColumnPage columnPage);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java b/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
new file mode 100644
index 0000000..f38b675
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/DimensionType.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+public enum DimensionType {
+  // global dictionary for low cardinality dimension
+  GLOBAL_DICTIONARY,
+
+  // for timestamp and date column
+  DIRECT_DICTIONARY,
+
+  // no dictionary, for high cardinality dimension
+  PLAIN_VALUE,
+
+  // expanded column from a complex data type column
+  COMPLEX,
+
+  // column group, multiple columns encoded as one column
+  COLUMN_GROUP
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
new file mode 100644
index 0000000..365f1ca
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/TableSpec.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+
+public class TableSpec {
+
+  // contains name and type for each dimension
+  private DimensionSpec dimensionSpec;
+  // contains name and type for each measure
+  private MeasureSpec measureSpec;
+
+  public TableSpec(List<CarbonDimension> dimensions, List<CarbonMeasure> measures) {
+    dimensionSpec = new DimensionSpec(dimensions);
+    measureSpec = new MeasureSpec(measures);
+  }
+
+  public DimensionSpec getDimensionSpec() {
+    return dimensionSpec;
+  }
+
+  public MeasureSpec getMeasureSpec() {
+    return measureSpec;
+  }
+
+  public class DimensionSpec {
+
+    // field name of each dimension, in schema order
+    private String[] fieldName;
+
+    // encoding type of each dimension, in schema order
+    private DimensionType[] types;
+
+    // number of simple dimensions
+    private int numSimpleDimensions;
+
+    // number of complex dimensions
+    private int numComplexDimensions;
+
+    // number of dimensions after complex column expansion
+    private int numDimensionExpanded;
+
+    DimensionSpec(List<CarbonDimension> dimensions) {
+      // first calculate total number of columnar field considering column group and complex column
+      numDimensionExpanded = 0;
+      numSimpleDimensions = 0;
+      numComplexDimensions = 0;
+      boolean inColumnGroup = false;
+      for (CarbonDimension dimension : dimensions) {
+        if (dimension.isColumnar()) {
+          if (inColumnGroup) {
+            inColumnGroup = false;
+          }
+          if (dimension.isComplex()) {
+            numDimensionExpanded += dimension.getNumDimensionsExpanded();
+            numComplexDimensions++;
+          } else {
+            numDimensionExpanded++;
+            numSimpleDimensions++;
+          }
+        } else {
+          // column group
+          if (!inColumnGroup) {
+            inColumnGroup = true;
+            numDimensionExpanded++;
+            numSimpleDimensions++;
+          }
+        }
+      }
+
+      // then extract dimension name and type for each column
+      fieldName = new String[numDimensionExpanded];
+      types = new DimensionType[numDimensionExpanded];
+      inColumnGroup = false;
+      int index = 0;
+      for (CarbonDimension dimension : dimensions) {
+        if (dimension.isColumnar()) {
+          if (inColumnGroup) {
+            inColumnGroup = false;
+          }
+          if (dimension.isComplex()) {
+            int count = addDimension(index, dimension);
+            index += count;
+          } else if (dimension.getDataType() == DataType.TIMESTAMP ||
+                     dimension.getDataType() == DataType.DATE) {
+            addSimpleDimension(index++, dimension.getColName(), DimensionType.DIRECT_DICTIONARY);
+          } else if (dimension.isGlobalDictionaryEncoding()) {
+            addSimpleDimension(index++, dimension.getColName(), DimensionType.GLOBAL_DICTIONARY);
+          } else {
+            addSimpleDimension(index++, dimension.getColName(), DimensionType.PLAIN_VALUE);
+          }
+        } else {
+          // column group
+          if (!inColumnGroup) {
+            addSimpleDimension(index++, dimension.getColName(), DimensionType.COLUMN_GROUP);
+            inColumnGroup = true;
+          }
+        }
+      }
+    }
+
+    private void addSimpleDimension(int index, String name, DimensionType type) {
+      fieldName[index] = name;
+      types[index] = type;
+    }
+
+    // add dimension and return number of columns added
+    private int addDimension(int index, CarbonDimension dimension) {
+      switch (dimension.getDataType()) {
+        case ARRAY:
+          addSimpleDimension(index, dimension.getColName() + ".offset", DimensionType.COMPLEX);
+          List<CarbonDimension> arrayChildren = dimension.getListOfChildDimensions();
+          int count = 1;
+          for (CarbonDimension child : arrayChildren) {
+            count += addDimension(index + count, child);
+          }
+          return count;
+        case STRUCT:
+          addSimpleDimension(index, dimension.getColName() + ".empty", DimensionType.COMPLEX);
+          List<CarbonDimension> structChildren = dimension.getListOfChildDimensions();
+          count = 1;
+          for (CarbonDimension child : structChildren) {
+            count += addDimension(index + count, child);
+          }
+          return count;
+        case TIMESTAMP:
+        case DATE:
+          addSimpleDimension(index, dimension.getColName(), DimensionType.DIRECT_DICTIONARY);
+          return 1;
+        default:
+          addSimpleDimension(index, dimension.getColName(),
+              dimension.isGlobalDictionaryEncoding() ?
+                  DimensionType.GLOBAL_DICTIONARY : DimensionType.PLAIN_VALUE);
+          return 1;
+      }
+    }
+
+
+    /**
+     * return the dimension type of index'th dimension. index is from 0 to numDimensions
+     */
+    public DimensionType getType(int index) {
+      assert (index >= 0 && index < types.length);
+      return types[index];
+    }
+
+    /**
+     * return number of dimensions
+     */
+    public int getNumSimpleDimensions() {
+      return numSimpleDimensions;
+    }
+
+    public int getNumComplexDimensions() {
+      return numComplexDimensions;
+    }
+
+    public int getNumExpandedDimensions() {
+      return numDimensionExpanded;
+    }
+  }
+
+  public class MeasureSpec {
+
+    // field name of each measure, in schema order
+    private String[] fieldName;
+
+    // data type of each measure, in schema order
+    private DataType[] types;
+
+    MeasureSpec(List<CarbonMeasure> measures) {
+      fieldName = new String[measures.size()];
+      types = new DataType[measures.size()];
+      int i = 0;
+      for (CarbonMeasure measure: measures) {
+        add(i++, measure.getColName(), measure.getDataType());
+      }
+    }
+
+    private void add(int index, String name, DataType type) {
+      fieldName[index] = name;
+      types[index] = type;
+    }
+
+    /**
+     * return the data type of index'th measure. index is from 0 to numMeasures
+     */
+    public DataType getType(int index) {
+      assert (index >= 0 && index < types.length);
+      return types[index];
+    }
+
+    /**
+     * return number of measures
+     */
+    public int getNumMeasures() {
+      return types.length;
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
index 38e3c02..cd12c3b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/block/SegmentProperties.java
@@ -188,7 +188,6 @@ public class SegmentProperties {
    *
    */
   private void intialiseColGroups() {
-    // StringBuffer columnGroups = new StringBuffer();
     List<List<Integer>> colGrpList = new ArrayList<List<Integer>>();
     List<Integer> group = new ArrayList<Integer>();
     for (int i = 0; i < dimensions.size(); i++) {
@@ -197,7 +196,6 @@ public class SegmentProperties {
         continue;
       }
       group.add(dimension.getOrdinal());
-      // columnGroups.append(dimension.getOrdinal());
       if (i < dimensions.size() - 1) {
         int currGroupOrdinal = dimension.columnGroupId();
         int nextGroupOrdinal = dimensions.get(i + 1).columnGroupId();
@@ -278,10 +276,10 @@ public class SegmentProperties {
    * @return last block index
    */
   private int fillComplexDimensionChildBlockIndex(int blockOrdinal, CarbonDimension dimension) {
-    for (int i = 0; i < dimension.numberOfChild(); i++) {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
       dimensionOrdinalToBlockMapping
           .put(dimension.getListOfChildDimensions().get(i).getOrdinal(), ++blockOrdinal);
-      if (dimension.getListOfChildDimensions().get(i).numberOfChild() > 0) {
+      if (dimension.getListOfChildDimensions().get(i).getNumberOfChild() > 0) {
         blockOrdinal = fillComplexDimensionChildBlockIndex(blockOrdinal,
             dimension.getListOfChildDimensions().get(i));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
index a94d08b..bcfe416 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/AbstractMeasureChunkReaderV2V3Format.java
@@ -121,4 +121,6 @@ public abstract class AbstractMeasureChunkReaderV2V3Format extends AbstractMeasu
    */
   protected abstract MeasureRawColumnChunk[] readRawMeasureChunksInGroup(FileHolder fileReader,
       int startColumnBlockletIndex, int endColumnBlockletIndex) throws IOException;
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
index 3ed1292..9397767 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v2/CompressedMeasureChunkFileBasedReaderV2.java
@@ -26,11 +26,14 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.format.DataChunk2;
 
 /**
@@ -132,15 +135,27 @@ public class CompressedMeasureChunkFileBasedReaderV2 extends AbstractMeasureChun
     List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
     for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
       valueEncodeMeta.add(
-          CarbonUtil.deserializeEncoderMeta(measureColumnChunk.getEncoder_meta().get(i).array()));
+          CarbonUtil.deserializeEncoderMetaV2(measureColumnChunk.getEncoder_meta().get(i).array()));
     }
-    WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
 
-    ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+    MeasurePageStatsVO stats = CarbonUtil.getMeasurePageStats(valueEncodeMeta);
+    int measureCount = valueEncodeMeta.size();
+    CompressionFinder[] finders = new CompressionFinder[measureCount];
+    DataType[] convertedType = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      CompressionFinder compresssionFinder =
+          ValueCompressionUtil.getCompressionFinder(stats.getMax(i), stats.getMin(i),
+              stats.getDecimal(i), stats.getDataType(i), stats.getDataTypeSelected(i));
+      finders[i] = compresssionFinder;
+      convertedType[i] = compresssionFinder.getConvertedDataType();
+    }
+
+    ValueCompressionHolder values = ValueCompressionUtil.getValueCompressionHolder(finders)[0];
+
     // uncompress
-    values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint,
-        measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
-        compressionModel.getMaxValue()[0], numberOfRows);
+    values.uncompress(convertedType[0], rawData.array(), copyPoint,
+        measureColumnChunk.data_page_length, stats.getDecimal(0),
+        stats.getMax(0), numberOfRows);
 
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
index 36839fd..c97a3a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/measure/v3/CompressedMeasureChunkFileBasedReaderV3.java
@@ -26,11 +26,14 @@ import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.reader.measure.AbstractMeasureChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.dataholder.CarbonReadDataHolder;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
+import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.CompressionFinder;
+import org.apache.carbondata.core.util.ValueCompressionUtil;
 import org.apache.carbondata.format.DataChunk2;
 import org.apache.carbondata.format.DataChunk3;
 
@@ -69,7 +72,7 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
    * 5. Create the raw chunk object and fill the details
    *
    * @param fileReader          reader for reading the column from carbon data file
-   * @param blockIndex          blocklet index of the column in carbon data file
+   * @param blockletColumnIndex          blocklet index of the column in carbon data file
    * @return measure raw chunk
    */
   @Override public MeasureRawColumnChunk readRawMeasureChunk(FileHolder fileReader,
@@ -217,16 +220,28 @@ public class CompressedMeasureChunkFileBasedReaderV3 extends AbstractMeasureChun
         .get(measureRawColumnChunk.getBlockletId()) + dataChunk3.getPage_offset().get(pageNumber);
     List<ValueEncoderMeta> valueEncodeMeta = new ArrayList<>();
     for (int i = 0; i < measureColumnChunk.getEncoder_meta().size(); i++) {
-      valueEncodeMeta.add(CarbonUtil
-          .deserializeEncoderMetaNew(measureColumnChunk.getEncoder_meta().get(i).array()));
+      valueEncodeMeta.add(
+          CarbonUtil.deserializeEncoderMetaV3(measureColumnChunk.getEncoder_meta().get(i).array()));
     }
-    WriterCompressModel compressionModel = CarbonUtil.getValueCompressionModel(valueEncodeMeta);
-    ValueCompressionHolder values = compressionModel.getValueCompressionHolder()[0];
+
+    MeasurePageStatsVO stats = CarbonUtil.getMeasurePageStats(valueEncodeMeta);
+    int measureCount = valueEncodeMeta.size();
+    CompressionFinder[] finders = new CompressionFinder[measureCount];
+    DataType[] convertedType = new DataType[measureCount];
+    for (int i = 0; i < measureCount; i++) {
+      CompressionFinder compresssionFinder =
+          ValueCompressionUtil.getCompressionFinder(stats.getMax(i), stats.getMin(i),
+              stats.getDecimal(i), stats.getDataType(i), stats.getDataTypeSelected(i));
+      finders[i] = compresssionFinder;
+      convertedType[i] = compresssionFinder.getConvertedDataType();
+    }
+
+    ValueCompressionHolder values = ValueCompressionUtil.getValueCompressionHolder(finders)[0];
     // uncompress
     ByteBuffer rawData = measureRawColumnChunk.getRawData();
-    values.uncompress(compressionModel.getConvertedDataType()[0], rawData.array(), copyPoint,
-        measureColumnChunk.data_page_length, compressionModel.getMantissa()[0],
-        compressionModel.getMaxValue()[0], measureRawColumnChunk.getRowCount()[pageNumber]);
+    values.uncompress(convertedType[0], rawData.array(), copyPoint,
+        measureColumnChunk.data_page_length, stats.getDecimal(0),
+        stats.getMax(0), measureRawColumnChunk.getRowCount()[pageNumber]);
     CarbonReadDataHolder measureDataHolder = new CarbonReadDataHolder(values);
     // set the data chunk
     datChunk.setMeasureDataHolder(measureDataHolder);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
index 89e0d90..f14338b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/columnar/BlockIndexerStorageForNoInvertedIndexForShort.java
@@ -36,8 +36,8 @@ public class BlockIndexerStorageForNoInvertedIndexForShort implements IndexStora
   private byte[] min;
   private byte[] max;
 
-  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput,boolean isNoDictonary)
-  {
+  public BlockIndexerStorageForNoInvertedIndexForShort(byte[][] keyBlockInput,
+      boolean isNoDictonary) {
     this.keyBlock = keyBlockInput;
     min = keyBlock[0];
     max = keyBlock[0];

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
index 7a39f7c..c2d0200 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/compression/MeasureMetaDataModel.java
@@ -103,7 +103,7 @@ public class MeasureMetaDataModel {
   }
 
   /**
-   * getUniqueValue
+   * nonExistValue
    *
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java b/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
deleted file mode 100644
index a2bf47a..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/compression/WriterCompressModel.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.compression;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.CompressionFinder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-
-import static org.apache.carbondata.core.metadata.datatype.DataType.INT;
-import static org.apache.carbondata.core.metadata.datatype.DataType.SHORT;
-
-public class WriterCompressModel {
-
-  /**
-   * DataType[]  variable.
-   */
-  private DataType[] convertedDataType;
-  /**
-   * DataType[]  variable.
-   */
-  private DataType[] actualDataType;
-
-  /**
-   * maxValue
-   */
-  private Object[] maxValue;
-  /**
-   * minValue.
-   */
-  private Object[] minValue;
-
-  /**
-   * uniqueValue
-   */
-  private Object[] uniqueValue;
-  /**
-   * mantissa.
-   */
-  private int[] mantissa;
-
-  /**
-   * aggType
-   */
-  private DataType[] type;
-
-  /**
-   * dataTypeSelected
-   */
-  private byte[] dataTypeSelected;
-  /**
-   * unCompressValues.
-   */
-  private ValueCompressionHolder[] valueHolder;
-
-  private CompressionFinder[] compressionFinders;
-
-  /**
-   * @return the convertedDataType
-   */
-  public DataType[] getConvertedDataType() {
-    return convertedDataType;
-  }
-
-  /**
-   * @param convertedDataType the convertedDataType to set
-   */
-  public void setConvertedDataType(DataType[] convertedDataType) {
-    this.convertedDataType = convertedDataType;
-  }
-
-  /**
-   * @return the actualDataType
-   */
-  public DataType[] getActualDataType() {
-    return actualDataType;
-  }
-
-  /**
-   * @param actualDataType
-   */
-  public void setActualDataType(DataType[] actualDataType) {
-    this.actualDataType = actualDataType;
-  }
-
-  /**
-   * @return the maxValue
-   */
-  public Object[] getMaxValue() {
-    return maxValue;
-  }
-
-  /**
-   * @param maxValue the maxValue to set
-   */
-  public void setMaxValue(Object[] maxValue) {
-    this.maxValue = maxValue;
-  }
-
-  /**
-   * @return the mantissa
-   */
-  public int[] getMantissa() {
-    return mantissa;
-  }
-
-  /**
-   * @param mantissa the mantissa to set
-   */
-  public void setMantissa(int[] mantissa) {
-    this.mantissa = mantissa;
-  }
-
-  /**
-   * getUnCompressValues().
-   *
-   * @return the unCompressValues
-   */
-  public ValueCompressionHolder[] getValueCompressionHolder() {
-    return valueHolder;
-  }
-
-  /**
-   * @param valueHolder set the ValueCompressionHolder
-   */
-  public void setValueCompressionHolder(ValueCompressionHolder[] valueHolder) {
-    this.valueHolder = valueHolder;
-  }
-
-  /**
-   * getMinValue
-   *
-   * @return
-   */
-  public Object[] getMinValue() {
-    return minValue;
-  }
-
-  /**
-   * setMinValue.
-   *
-   * @param minValue
-   */
-  public void setMinValue(Object[] minValue) {
-    this.minValue = minValue;
-  }
-
-  /**
-   * @return the aggType
-   */
-  public char[] getType() {
-    char[] ret = new char[type.length];
-    for (int i = 0; i < ret.length; i++) {
-      switch (type[i]) {
-        case SHORT:
-        case INT:
-        case LONG:
-          ret[i] = CarbonCommonConstants.BIG_INT_MEASURE;
-          break;
-        case DOUBLE:
-          ret[i] = CarbonCommonConstants.DOUBLE_MEASURE;
-          break;
-        case DECIMAL:
-          ret[i] = CarbonCommonConstants.BIG_DECIMAL_MEASURE;
-          break;
-      }
-    }
-    return ret;
-  }
-
-  public DataType[] getDataType() {
-    return type;
-  }
-
-  /**
-   * @param type the type to set
-   */
-  public void setType(DataType[] type) {
-    this.type = type;
-  }
-
-  /**
-   * @return the dataTypeSelected
-   */
-  public byte[] getDataTypeSelected() {
-    return dataTypeSelected;
-  }
-
-  /**
-   * @param dataTypeSelected the dataTypeSelected to set
-   */
-  public void setDataTypeSelected(byte[] dataTypeSelected) {
-    this.dataTypeSelected = dataTypeSelected;
-  }
-
-  /**
-   * getUniqueValue
-   *
-   * @return
-   */
-  public Object[] getUniqueValue() {
-    return uniqueValue;
-  }
-
-  /**
-   * setUniqueValue
-   *
-   * @param uniqueValue
-   */
-  public void setUniqueValue(Object[] uniqueValue) {
-    this.uniqueValue = uniqueValue;
-  }
-
-  public void setCompressionFinders(CompressionFinder[] compressionFinders) {
-    this.compressionFinders = compressionFinders;
-  }
-
-  public CompressionFinder[] getCompressionFinders() {
-    return this.compressionFinders;
-  }
-
-  /**
-   * @return the compType
-   */
-  public ValueCompressionUtil.COMPRESSION_TYPE getCompType(int index) {
-    return this.compressionFinders[index].getCompType();
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java b/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
deleted file mode 100644
index 8d3cc0d..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/dataholder/CarbonWriteDataHolder.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.dataholder;
-
-public class CarbonWriteDataHolder {
-  /**
-   * doubleValues
-   */
-  private double[] doubleValues;
-
-  /**
-   * longValues
-   */
-  private long[] longValues;
-
-  /**
-   * byteValues
-   */
-  private byte[][] byteValues;
-
-  /**
-   * size
-   */
-  private int size;
-
-  /**
-   * total size of the data in bytes added
-   */
-  private int totalSize;
-
-  public void reset() {
-    size = 0;
-    totalSize = 0;
-  }
-
-  /**
-   * set long data type columnar data
-   * @param values
-   */
-  public void setWritableLongPage(long[] values) {
-    if (values != null) {
-      longValues = values;
-      size += values.length;
-      totalSize += values.length;
-    }
-  }
-
-  /**
-   * set double data type columnar data
-   * @param values
-   */
-  public void setWritableDoublePage(double[] values) {
-    if (values != null) {
-      doubleValues = values;
-      size += values.length;
-      totalSize += values.length;
-    }
-  }
-
-  /**
-   * set decimal data type columnar data
-   * @param values
-   */
-  public void setWritableDecimalPage(byte[][] values) {
-    if (values != null) {
-      byteValues = values;
-      size += values.length;
-      for (int i = 0; i < values.length; i++) {
-        if (values[i] != null) {
-          totalSize += values[i].length;
-        }
-      }
-    }
-  }
-
-  /**
-   * Get Writable Double Values
-   */
-  public double[] getWritableDoubleValues() {
-    if (size < doubleValues.length) {
-      double[] temp = new double[size];
-      System.arraycopy(doubleValues, 0, temp, 0, size);
-      doubleValues = temp;
-    }
-    return doubleValues;
-  }
-
-  /**
-   * Get writable byte array values
-   */
-  public byte[] getWritableByteArrayValues() {
-    byte[] temp = new byte[totalSize];
-    int startIndexToCopy = 0;
-    for (int i = 0; i < size; i++) {
-      if (byteValues[i] != null) {
-        System.arraycopy(byteValues[i], 0, temp, startIndexToCopy, byteValues[i].length);
-        startIndexToCopy += byteValues[i].length;
-      }
-    }
-    return temp;
-  }
-
-  /**
-   * Get Writable Double Values
-   *
-   * @return
-   */
-  public long[] getWritableLongValues() {
-    if (size < longValues.length) {
-      long[] temp = new long[size];
-      System.arraycopy(longValues, 0, temp, 0, size);
-      longValues = temp;
-    }
-    return longValues;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
new file mode 100644
index 0000000..8f75ee8
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/exception/CarbonDataWriterException.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.exception;
+
+import java.util.Locale;
+
+public class CarbonDataWriterException extends RuntimeException {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataWriterException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonDataWriterException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 25a813c..da3a82e 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -17,26 +17,208 @@
 
 package org.apache.carbondata.core.datastore.page;
 
-import org.apache.carbondata.core.datastore.page.statistics.PageStatistics;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
 
+// Represent a columnar data in one page for one column.
 public class ColumnPage {
 
   protected final DataType dataType;
   protected final int pageSize;
-  protected PageStatistics stats;
+  protected ColumnPageStatistics stats;
+
+  // Only one of following fields will be used
+  private byte[] byteData;
+  private short[] shortData;
+  private int[] intData;
+  private long[] longData;
+  private double[] doubleData;
+
+  // for string and decimal data
+  private byte[][] byteArrayData;
 
-  protected ColumnPage(DataType dataType, int pageSize) {
+  // The index of the rowId whose value is null, will be set to 1
+  private BitSet nullBitSet;
+
+  public ColumnPage(DataType dataType, int pageSize) {
     this.dataType = dataType;
     this.pageSize = pageSize;
-    this.stats = new PageStatistics(dataType);
+    this.stats = new ColumnPageStatistics(dataType);
+    this.nullBitSet = new BitSet(pageSize);
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        longData = new long[pageSize];
+        break;
+      case DOUBLE:
+        doubleData = new double[pageSize];
+        break;
+      case DECIMAL:
+        byteArrayData = new byte[pageSize][];
+        break;
+      case STRING:
+        byteArrayData = new byte[pageSize][];
+        break;
+      default:
+        throw new RuntimeException("Unsupported data dataType: " + dataType);
+    }
   }
 
-  protected void updateStatistics(Object value) {
-    stats.update(value);
+  public DataType getDataType() {
+    return dataType;
   }
 
-  public PageStatistics getStatistics() {
+  public ColumnPageStatistics getStatistics() {
     return stats;
   }
+
+  public void putData(int rowId, Object value) {
+    if (value == null) {
+      putNull(rowId);
+      return;
+    }
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+        putLong(rowId, ((Short) value).longValue());
+        break;
+      case INT:
+        putLong(rowId, ((Integer) value).longValue());
+        break;
+      case LONG:
+        putLong(rowId, (long) value);
+        break;
+      case DOUBLE:
+        putDouble(rowId, (double) value);
+        break;
+      case DECIMAL:
+        putDecimalBytes(rowId, (byte[]) value);
+        break;
+      case STRING:
+        putStringBytes(rowId, (byte[]) value);
+        break;
+      default:
+        throw new RuntimeException("unsupported data type: " + dataType);
+    }
+    stats.update(value);
+  }
+
+  /**
+   * Set byte value at rowId
+   */
+  public void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  /**
+   * Set short value at rowId
+   */
+  public void putShort(int rowId, short value) {
+    shortData[rowId] = value;
+  }
+
+  /**
+   * Set integer value at rowId
+   */
+  public void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  /**
+   * Set long value at rowId
+   */
+  public void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  /**
+   * Set double value at rowId
+   */
+  public void putDouble(int rowId, double value) {
+    doubleData[rowId] = value;
+  }
+
+  /**
+   * Set decimal value at rowId
+   */
+  public void putDecimalBytes(int rowId, byte[] decimalInBytes) {
+    // do LV (length value) coded of input bytes
+    ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
+        CarbonCommonConstants.INT_SIZE_IN_BYTE);
+    byteBuffer.putInt(decimalInBytes.length);
+    byteBuffer.put(decimalInBytes);
+    byteBuffer.flip();
+    byteArrayData[rowId] = byteBuffer.array();
+  }
+
+  /**
+   * Set string value at rowId
+   */
+  public void putStringBytes(int rowId, byte[] stringInBytes) {
+    byteArrayData[rowId] = stringInBytes;
+  }
+
+  /**
+   * Set null at rowId
+   */
+  public void putNull(int rowId) {
+    nullBitSet.set(rowId);
+    switch (dataType) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        putLong(rowId, 0L);
+        break;
+      case DOUBLE:
+        putDouble(rowId, 0.0);
+        break;
+      case DECIMAL:
+        byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
+        putDecimalBytes(rowId, decimalInBytes);
+        break;
+    }
+  }
+
+  /**
+   * Get long value page
+   */
+  public long[] getLongPage() {
+    return longData;
+  }
+
+  /**
+   * Get double value page
+   */
+  public double[] getDoublePage() {
+    return doubleData;
+  }
+
+  /**
+   * Get decimal value page
+   */
+  public byte[][] getDecimalPage() {
+    return byteArrayData;
+  }
+
+  /**
+   * Get string page
+   */
+  public byte[][] getStringPage() {
+    return byteArrayData;
+  }
+
+  /**
+   * Get null bitset page
+   */
+  public BitSet getNullBitSet() {
+    return nullBitSet;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
index 024c341..d50ea4b 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ComplexColumnPage.java
@@ -22,10 +22,9 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 
 // Represent a complex column page, e.g. Array, Struct type column
-public class ComplexColumnPage extends ColumnPage {
+public class ComplexColumnPage {
 
   // Holds data for all rows in this page in columnar layout.
   // After the complex data expand, it is of type byte[][], the first level array in the byte[][]
@@ -37,8 +36,10 @@ public class ComplexColumnPage extends ColumnPage {
   // depth is the number of column after complex type is expanded. It is from 1 to N
   private final int depth;
 
+  private final int pageSize;
+
   public ComplexColumnPage(int pageSize, int depth) {
-    super(DataType.BYTE_ARRAY, pageSize);
+    this.pageSize = pageSize;
     this.depth = depth;
     complexColumnData = new ArrayList<>(depth);
     for (int i = 0; i < depth; i++) {
@@ -74,4 +75,5 @@ public class ComplexColumnPage extends ColumnPage {
   public int getDepth() {
     return depth;
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
deleted file mode 100644
index a523fa5..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/FixLengthColumnPage.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page;
-
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-// Represent a columnar data in one page for one column.
-public class FixLengthColumnPage extends ColumnPage {
-
-  // Only one of following fields will be used
-  private byte[] byteData;
-  private short[] shortData;
-  private int[] intData;
-  private long[] longData;
-  private double[] doubleData;
-
-  private byte[][] byteArrayData;
-
-  // The index of the rowId whose value is null, will be set to 1
-  private BitSet nullBitSet;
-
-  public FixLengthColumnPage(DataType dataType, int pageSize) {
-    super(dataType, pageSize);
-    nullBitSet = new BitSet(pageSize);
-    switch (dataType) {
-      case SHORT:
-      case INT:
-      case LONG:
-        longData = new long[pageSize];
-        break;
-      case DOUBLE:
-        doubleData = new double[pageSize];
-        break;
-      case DECIMAL:
-        byteArrayData = new byte[pageSize][];
-        break;
-      default:
-        throw new RuntimeException("Unsupported data dataType: " + dataType);
-    }
-  }
-
-  public DataType getDataType() {
-    return dataType;
-  }
-
-  private void putByte(int rowId, byte value) {
-    byteData[rowId] = value;
-  }
-
-  private void putShort(int rowId, short value) {
-    shortData[rowId] = value;
-  }
-
-  private void putInt(int rowId, int value) {
-    intData[rowId] = value;
-  }
-
-  private void putLong(int rowId, long value) {
-    longData[rowId] = value;
-  }
-
-  private void putDouble(int rowId, double value) {
-    doubleData[rowId] = value;
-  }
-
-  // This method will do LV (length value) coded of input bytes
-  private void putDecimalBytes(int rowId, byte[] decimalInBytes) {
-    ByteBuffer byteBuffer = ByteBuffer.allocate(decimalInBytes.length +
-        CarbonCommonConstants.INT_SIZE_IN_BYTE);
-    byteBuffer.putInt(decimalInBytes.length);
-    byteBuffer.put(decimalInBytes);
-    byteBuffer.flip();
-    byteArrayData[rowId] = byteBuffer.array();
-  }
-
-  public void putData(int rowId, Object value) {
-    if (value == null) {
-      putNull(rowId);
-      return;
-    }
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-        putLong(rowId, ((Short) value).longValue());
-        break;
-      case INT:
-        putLong(rowId, ((Integer) value).longValue());
-        break;
-      case LONG:
-        putLong(rowId, (long) value);
-        break;
-      case DOUBLE:
-        putDouble(rowId, (double) value);
-        break;
-      case DECIMAL:
-        putDecimalBytes(rowId, (byte[]) value);
-        break;
-      default:
-        throw new RuntimeException("unsupported data type: " + dataType);
-    }
-    updateStatistics(value);
-  }
-
-  private void putNull(int rowId) {
-    nullBitSet.set(rowId);
-    switch (dataType) {
-      case BYTE:
-      case SHORT:
-      case INT:
-      case LONG:
-        putLong(rowId, 0L);
-        break;
-      case DOUBLE:
-        putDouble(rowId, 0.0);
-        break;
-      case DECIMAL:
-        byte[] decimalInBytes = DataTypeUtil.bigDecimalToByte(BigDecimal.ZERO);
-        putDecimalBytes(rowId, decimalInBytes);
-        break;
-    }
-  }
-
-  public long[] getLongPage() {
-    return longData;
-  }
-
-  public double[] getDoublePage() {
-    return doubleData;
-  }
-
-  public byte[][] getDecimalPage() {
-    return byteArrayData;
-  }
-
-  public BitSet getNullBitSet() {
-    return nullBitSet;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
deleted file mode 100644
index d5e9ce3..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-
-// Represent a variable length columnar data in one page, e.g. for dictionary columns.
-public class VarLengthColumnPage extends ColumnPage {
-
-  // TODO: further optimizite it, to store length and data separately
-  private byte[][] byteArrayData;
-
-  public VarLengthColumnPage(int pageSize) {
-    super(DataType.BYTE_ARRAY, pageSize);
-    byteArrayData = new byte[pageSize][];
-  }
-
-  public void putByteArray(int rowId, byte[] value) {
-    byteArrayData[rowId] = value;
-    updateStatistics(value);
-  }
-
-  public byte[][] getByteArrayPage() {
-    return byteArrayData;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
new file mode 100644
index 0000000..960a530
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/ColumnPageStatistics.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+
+/** statics for one column page */
+public class ColumnPageStatistics {
+  private DataType dataType;
+
+  /** min and max value of the measures */
+  private Object min, max;
+
+  /**
+   * the unique value is the non-exist value in the row,
+   * and will be used as storage key for null values of measures
+   */
+  private Object nonExistValue;
+
+  /** decimal count of the measures */
+  private int decimal;
+
+  public ColumnPageStatistics(DataType dataType) {
+    this.dataType = dataType;
+    switch (dataType) {
+      case SHORT:
+      case INT:
+      case LONG:
+        max = Long.MIN_VALUE;
+        min = Long.MAX_VALUE;
+        nonExistValue = Long.MIN_VALUE;
+        break;
+      case DOUBLE:
+        max = Double.MIN_VALUE;
+        min = Double.MAX_VALUE;
+        nonExistValue = Double.MIN_VALUE;
+        break;
+      case DECIMAL:
+        max = new BigDecimal(Double.MIN_VALUE);
+        min = new BigDecimal(Double.MAX_VALUE);
+        nonExistValue = new BigDecimal(Double.MIN_VALUE);
+        break;
+    }
+    decimal = 0;
+  }
+
+  /**
+   * update the statistics for the input row
+   */
+  public void update(Object value) {
+    switch (dataType) {
+      case SHORT:
+        max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
+        min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
+        nonExistValue = (long) min - 1;
+        break;
+      case INT:
+        max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
+        min = ((long) min  < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
+        nonExistValue = (long) min - 1;
+        break;
+      case LONG:
+        max = ((long) max > (long) value) ? max : value;
+        min = ((long) min < (long) value) ? min : value;
+        nonExistValue = (long) min - 1;
+        break;
+      case DOUBLE:
+        max = ((double) max > (double) value) ? max : value;
+        min = ((double) min < (double) value) ? min : value;
+        int num = getDecimalCount((double) value);
+        decimal = decimal > num ? decimal : num;
+        nonExistValue = (double) min - 1;
+        break;
+      case DECIMAL:
+        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
+        decimal = decimalValue.scale();
+        BigDecimal val = (BigDecimal) min;
+        nonExistValue = (val.subtract(new BigDecimal(1.0)));
+        break;
+      case ARRAY:
+      case STRUCT:
+        // for complex type column, writer is not going to use stats, so, do nothing
+    }
+  }
+
+  /**
+   * return no of digit after decimal
+   */
+  private int getDecimalCount(double value) {
+    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
+    int integerPlaces = strValue.indexOf('.');
+    int decimalPlaces = 0;
+    if (-1 != integerPlaces) {
+      decimalPlaces = strValue.length() - integerPlaces - 1;
+    }
+    return decimalPlaces;
+  }
+
+  /**
+   * return min value as byte array
+   */
+  public byte[] minBytes() {
+    return getValueAsBytes(getMin());
+  }
+
+  /**
+   * return max value as byte array
+   */
+  public byte[] maxBytes() {
+    return getValueAsBytes(getMax());
+  }
+
+  /**
+   * convert value to byte array
+   */
+  private byte[] getValueAsBytes(Object value) {
+    ByteBuffer b;
+    switch (dataType) {
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((Double) value);
+        b.flip();
+        return b.array();
+      case LONG:
+      case INT:
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((Long) value);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+        return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
+      default:
+        throw new IllegalArgumentException("Invalid data type");
+    }
+  }
+
+  public Object getMin() {
+    return min;
+  }
+
+  public Object getMax() {
+    return max;
+  }
+
+  public Object nonExistValue() {
+    return nonExistValue;
+  }
+
+  public int getDecimal() {
+    return decimal;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
new file mode 100644
index 0000000..33440f5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/MeasurePageStatsVO.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public class MeasurePageStatsVO {
+  // statistics of each measure column
+  private Object[] min, max, nonExistValue;
+  private int[] decimal;
+
+  private DataType[] dataType;
+  private byte[] selectedDataType;
+
+  private MeasurePageStatsVO() {
+  }
+
+  public MeasurePageStatsVO(ColumnPage[] measurePages) {
+    min = new Object[measurePages.length];
+    max = new Object[measurePages.length];
+    nonExistValue = new Object[measurePages.length];
+    decimal = new int[measurePages.length];
+    dataType = new DataType[measurePages.length];
+    selectedDataType = new byte[measurePages.length];
+    for (int i = 0; i < measurePages.length; i++) {
+      ColumnPageStatistics stats = measurePages[i].getStatistics();
+      min[i] = stats.getMin();
+      max[i] = stats.getMax();
+      nonExistValue[i] = stats.nonExistValue();
+      decimal[i] = stats.getDecimal();
+      dataType[i] = measurePages[i].getDataType();
+    }
+  }
+
+  public static MeasurePageStatsVO build(ValueEncoderMeta[] encoderMetas) {
+    Object[] max = new Object[encoderMetas.length];
+    Object[] min = new Object[encoderMetas.length];
+    int[] decimal = new int[encoderMetas.length];
+    Object[] nonExistValue = new Object[encoderMetas.length];
+    DataType[] dataType = new DataType[encoderMetas.length];
+    byte[] selectedDataType = new byte[encoderMetas.length];
+    for (int i = 0; i < encoderMetas.length; i++) {
+      max[i] = encoderMetas[i].getMaxValue();
+      min[i] = encoderMetas[i].getMinValue();
+      decimal[i] = encoderMetas[i].getDecimal();
+      nonExistValue[i] = encoderMetas[i].getUniqueValue();
+      dataType[i] = encoderMetas[i].getType();
+      selectedDataType[i] = encoderMetas[i].getDataTypeSelected();
+    }
+
+    MeasurePageStatsVO stats = new MeasurePageStatsVO();
+    stats.dataType = dataType;
+    stats.selectedDataType = selectedDataType;
+    stats.min = min;
+    stats.max = max;
+    stats.nonExistValue = nonExistValue;
+    stats.decimal = decimal;
+    return stats;
+  }
+
+  public DataType getDataType(int measureIndex) {
+    return dataType[measureIndex];
+  }
+
+  public Object getMin(int measureIndex) {
+    return min[measureIndex];
+  }
+
+  public Object getMax(int measureIndex) {
+    return max[measureIndex];
+  }
+
+  public int getDecimal(int measureIndex) {
+    return decimal[measureIndex];
+  }
+
+  public Object getNonExistValue(int measureIndex) {
+    return nonExistValue[measureIndex];
+  }
+
+  public byte getDataTypeSelected(int measureIndex) {
+    return selectedDataType[measureIndex];
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
deleted file mode 100644
index 9190ae7..0000000
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PageStatistics.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.page.statistics;
-
-import java.math.BigDecimal;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/** statics for one column page */
-public class PageStatistics {
-  private DataType dataType;
-
-  /** min and max value of the measures */
-  private Object min, max;
-
-  /**
-   * the unique value is the non-exist value in the row,
-   * and will be used as storage key for null values of measures
-   */
-  private Object uniqueValue;
-
-  /** decimal count of the measures */
-  private int decimal;
-
-  public PageStatistics(DataType dataType) {
-    this.dataType = dataType;
-    switch (dataType) {
-      case SHORT:
-      case INT:
-      case LONG:
-        max = Long.MIN_VALUE;
-        min = Long.MAX_VALUE;
-        uniqueValue = Long.MIN_VALUE;
-        break;
-      case DOUBLE:
-        max = Double.MIN_VALUE;
-        min = Double.MAX_VALUE;
-        uniqueValue = Double.MIN_VALUE;
-        break;
-      case DECIMAL:
-        max = new BigDecimal(Double.MIN_VALUE);
-        min = new BigDecimal(Double.MAX_VALUE);
-        uniqueValue = new BigDecimal(Double.MIN_VALUE);
-        break;
-    }
-    decimal = 0;
-  }
-
-  /**
-   * update the statistics for the input row
-   */
-  public void update(Object value) {
-    switch (dataType) {
-      case SHORT:
-        max = ((long) max > ((Short) value).longValue()) ? max : ((Short) value).longValue();
-        min = ((long) min < ((Short) value).longValue()) ? min : ((Short) value).longValue();
-        uniqueValue = (long) min - 1;
-        break;
-      case INT:
-        max = ((long) max > ((Integer) value).longValue()) ? max : ((Integer) value).longValue();
-        min = ((long) min  < ((Integer) value).longValue()) ? min : ((Integer) value).longValue();
-        uniqueValue = (long) min - 1;
-        break;
-      case LONG:
-        max = ((long) max > (long) value) ? max : value;
-        min = ((long) min < (long) value) ? min : value;
-        uniqueValue = (long) min - 1;
-        break;
-      case DOUBLE:
-        max = ((double) max > (double) value) ? max : value;
-        min = ((double) min < (double) value) ? min : value;
-        int num = getDecimalCount((double) value);
-        decimal = decimal > num ? decimal : num;
-        uniqueValue = (double) min - 1;
-        break;
-      case DECIMAL:
-        BigDecimal decimalValue = DataTypeUtil.byteToBigDecimal((byte[]) value);
-        decimal = decimalValue.scale();
-        BigDecimal val = (BigDecimal) min;
-        uniqueValue = (val.subtract(new BigDecimal(1.0)));
-        break;
-      case ARRAY:
-      case STRUCT:
-        // for complex type column, writer is not going to use stats, so, do nothing
-    }
-  }
-
-  /**
-   * return no of digit after decimal
-   */
-  private int getDecimalCount(double value) {
-    String strValue = BigDecimal.valueOf(Math.abs(value)).toPlainString();
-    int integerPlaces = strValue.indexOf('.');
-    int decimalPlaces = 0;
-    if (-1 != integerPlaces) {
-      decimalPlaces = strValue.length() - integerPlaces - 1;
-    }
-    return decimalPlaces;
-  }
-
-  public Object getMin() {
-    return min;
-  }
-
-  public Object getMax() {
-    return max;
-  }
-
-  public Object getUniqueValue() {
-    return uniqueValue;
-  }
-
-  public int getDecimal() {
-    return decimal;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
new file mode 100644
index 0000000..86ac214
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/CarbonRow.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.row;
+
+import java.util.Arrays;
+
+/**
+ * This row class is used to transfer the row data from one step to other step
+ */
+public class CarbonRow {
+
+  private Object[] data;
+
+  public short bucketNumber;
+
+  public CarbonRow(Object[] data) {
+    this.data = data;
+  }
+
+  public Object[] getData() {
+    return data;
+  }
+
+  public void setData(Object[] data) {
+    this.data = data;
+  }
+
+  public String getString(int ordinal) {
+    return (String) data[ordinal];
+  }
+
+  public Object getObject(int ordinal) {
+    return data[ordinal];
+  }
+
+  public Object[] getObjectArray(int ordinal) {
+    return (Object[]) data[ordinal];
+  }
+
+  public int[] getIntArray(int ordinal) {
+    return (int[]) data[ordinal];
+  }
+
+  public void update(Object value, int ordinal) {
+    data[ordinal] = value;
+  }
+
+  public CarbonRow getCopy() {
+    Object[] copy = new Object[data.length];
+    System.arraycopy(data, 0, copy, 0, copy.length);
+    return new CarbonRow(copy);
+  }
+
+  @Override public String toString() {
+    return Arrays.toString(data);
+  }
+
+}