You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/06/10 03:46:40 UTC

[4/6] carbondata git commit: extract interface

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
new file mode 100644
index 0000000..107c52f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/WriteStepRowUtil.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.row;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+
+// Utility to create and retrieve data from CarbonRow in write step.
+public class WriteStepRowUtil {
+
+  // In write step, the element of CarbonRow is:
+  // 0: Dictionary dimension columns, encoded as int for each column
+  // 1: No dictionary and complex columns, they are all together encoded as one element (byte[][])
+  // 2: Measure columns, encoded as Object for each column
+
+  public static final int DICTIONARY_DIMENSION = 0;
+  public static final int NO_DICTIONARY_AND_COMPLEX = 1;
+  public static final int MEASURE = 2;
+
+  public static CarbonRow fromColumnCategory(int[] dictDimensions, byte[][] noDictAndComplex,
+      Object[] measures) {
+    Object[] row = new Object[3];
+    row[DICTIONARY_DIMENSION] = dictDimensions;
+    row[NO_DICTIONARY_AND_COMPLEX] = noDictAndComplex;
+    row[MEASURE] = measures;
+    return new CarbonRow(row);
+  }
+
+  public static CarbonRow fromMergerRow(Object[] row, SegmentProperties segmentProperties) {
+    Object[] converted = new Object[3];
+
+    // dictionary dimension
+    byte[] mdk = ((ByteArrayWrapper) row[0]).getDictionaryKey();
+    long[] keys = segmentProperties.getDimensionKeyGenerator().getKeyArray(mdk);
+    int[] dictDimensions = new int[keys.length];
+    for (int i = 0; i < keys.length; i++) {
+      dictDimensions[i] = Long.valueOf(keys[i]).intValue();
+    }
+    converted[DICTIONARY_DIMENSION] = dictDimensions;
+
+    // no dictionary and complex dimension
+    converted[NO_DICTIONARY_AND_COMPLEX] = ((ByteArrayWrapper) row[0]).getNoDictionaryKeys();
+
+    // measure
+    int measureCount = row.length - 1;
+    Object[] measures = new Object[measureCount];
+    System.arraycopy(row, 1, measures, 0, measureCount);
+    converted[MEASURE] = measures;
+
+    return new CarbonRow(converted);
+  }
+
+  private static int[] getDictDimension(CarbonRow row) {
+    return (int[]) row.getData()[DICTIONARY_DIMENSION];
+  }
+
+  public static byte[] getMdk(CarbonRow row, KeyGenerator keyGenerator) throws KeyGenException {
+    return keyGenerator.generateKey(getDictDimension(row));
+  }
+
+  public static byte[][] getNoDictAndComplexDimension(CarbonRow row) {
+    return (byte[][]) row.getData()[NO_DICTIONARY_AND_COMPLEX];
+  }
+
+  public static Object[] getMeasure(CarbonRow row) {
+    return (Object[]) row.getData()[MEASURE];
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
index b68a46a..47df6a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.metadata;
 
 import java.util.BitSet;
 
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 
 public class BlockletInfoColumnar {
 
@@ -85,7 +85,7 @@ public class BlockletInfoColumnar {
 
   private boolean[] aggKeyBlock;
 
-  private WriterCompressModel compressionModel;
+  private MeasurePageStatsVO stats;
 
   /**
    * column min array
@@ -316,14 +316,6 @@ public class BlockletInfoColumnar {
     this.columnMinData = columnMinData;
   }
 
-  public WriterCompressModel getCompressionModel() {
-    return compressionModel;
-  }
-
-  public void setCompressionModel(WriterCompressModel compressionModel) {
-    this.compressionModel = compressionModel;
-  }
-
   /**
    * @return
    */
@@ -351,4 +343,12 @@ public class BlockletInfoColumnar {
   public void setMeasureNullValueIndex(BitSet[] measureNullValueIndex) {
     this.measureNullValueIndex = measureNullValueIndex;
   }
+
+  public MeasurePageStatsVO getStats() {
+    return stats;
+  }
+
+  public void setStats(MeasurePageStatsVO stats) {
+    this.stats = stats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index ef1dad7..079540f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -126,7 +126,7 @@ public final class CarbonMetadata {
       if (dimension.getColumnId().equals(columnIdentifier)) {
         return dimension;
       }
-      if (dimension.numberOfChild() > 0) {
+      if (dimension.getNumberOfChild() > 0) {
         CarbonDimension childDim =
             getCarbonChildDimsBasedOnColIdentifier(columnIdentifier, dimension);
         if (null != childDim) {
@@ -147,10 +147,10 @@ public final class CarbonMetadata {
    */
   private CarbonDimension getCarbonChildDimsBasedOnColIdentifier(String columnIdentifier,
       CarbonDimension dimension) {
-    for (int i = 0; i < dimension.numberOfChild(); i++) {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
       if (dimension.getListOfChildDimensions().get(i).getColumnId().equals(columnIdentifier)) {
         return dimension.getListOfChildDimensions().get(i);
-      } else if (dimension.getListOfChildDimensions().get(i).numberOfChild() > 0) {
+      } else if (dimension.getListOfChildDimensions().get(i).getNumberOfChild() > 0) {
         CarbonDimension childDim = getCarbonChildDimsBasedOnColIdentifier(columnIdentifier,
             dimension.getListOfChildDimensions().get(i));
         if (null != childDim) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index da13d5c..204ac1c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -33,10 +33,7 @@ public enum DataType {
   ARRAY(9, "ARRAY"),
   STRUCT(10, "STRUCT"),
   MAP(11, "MAP"),
-  BYTE(12, "BYTE"),
-
-  // internal use only
-  BYTE_ARRAY(13, "BYTE ARRAY");
+  BYTE(12, "BYTE");
 
   private int precedenceOrder;
   private String name ;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
index 8d02512..dd01c56 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonDimension.java
@@ -85,13 +85,6 @@ public class CarbonDimension extends CarbonColumn {
     return listOfChildDimensions;
   }
 
-  /**
-   * @return return the number of child present in case of complex type
-   */
-  public int numberOfChild() {
-    return columnSchema.getNumberOfChild();
-  }
-
   public boolean hasEncoding(Encoding encoding) {
     return columnSchema.getEncodingList().contains(encoding);
   }
@@ -121,6 +114,22 @@ public class CarbonDimension extends CarbonColumn {
     this.complexTypeOrdinal = complexTypeOrdinal;
   }
 
+  public boolean isGlobalDictionaryEncoding() {
+    return getEncoder().contains(Encoding.DICTIONARY);
+  }
+
+  public int getNumDimensionsExpanded() {
+    if (listOfChildDimensions == null) {
+      // there is no child, return 1 column
+      return 1;
+    }
+    int columnCount = 1;
+    for (CarbonDimension dimension: listOfChildDimensions) {
+      columnCount += dimension.getNumDimensionsExpanded();
+    }
+    return columnCount;
+  }
+
   /**
    * @return is column participated in sorting or not
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
index dead205..1726fb7 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/CarbonImplicitDimension.java
@@ -92,7 +92,7 @@ public class CarbonImplicitDimension extends CarbonDimension {
   /**
    * @return return the number of child present in case of complex type
    */
-  @Override public int numberOfChild() {
+  @Override public int getNumberOfChild() {
     return 0;
   }
 
@@ -111,13 +111,6 @@ public class CarbonImplicitDimension extends CarbonDimension {
   }
 
   /**
-   * @return number of children for complex type
-   */
-  public int getNumberOfChild() {
-    return 0;
-  }
-
-  /**
    * @return the dataType
    */
   @Override public DataType getDataType() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
index 6250dba..eb4bf03 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/column/ColumnSchema.java
@@ -231,14 +231,14 @@ public class ColumnSchema implements Serializable {
   }
 
   /**
-   * @return the numberOfChild
+   * @return the getNumberOfChild
    */
   public int getNumberOfChild() {
     return numberOfChild;
   }
 
   /**
-   * @param numberOfChild the numberOfChild to set
+   * @param numberOfChild the getNumberOfChild to set
    */
   public void setNumberOfChild(int numberOfChild) {
     this.numberOfChild = numberOfChild;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 2a5c342..00df16b 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -402,8 +402,8 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         new HashSet<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     int counter = 0;
     while (counter < queryDimension.size()) {
-      if (queryDimension.get(counter).getDimension().numberOfChild() > 0) {
-        counter += queryDimension.get(counter).getDimension().numberOfChild();
+      if (queryDimension.get(counter).getDimension().getNumberOfChild() > 0) {
+        counter += queryDimension.get(counter).getDimension().getNumberOfChild();
         continue;
       } else if (!CarbonUtil.hasEncoding(queryDimension.get(counter).getDimension().getEncoder(),
           Encoding.DICTIONARY)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index b12cfb0..f03f6ea 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -221,7 +221,7 @@ public class QueryUtil {
 
       Integer dimensionOrdinal = queryDimensions.get(i).getDimension().getOrdinal();
       allProjectionListDimensionIndexes.add(dimensionOrdinalToBlockMapping.get(dimensionOrdinal));
-      if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
+      if (queryDimensions.get(i).getDimension().getNumberOfChild() > 0) {
         addChildrenBlockIndex(allProjectionListDimensionIndexes,
             queryDimensions.get(i).getDimension());
       }
@@ -229,7 +229,7 @@ public class QueryUtil {
       if (!filterDimensionOrdinal.contains(dimensionOrdinal)) {
         blockIndex = dimensionOrdinalToBlockMapping.get(dimensionOrdinal);
         dimensionBlockIndex.add(blockIndex);
-        if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
+        if (queryDimensions.get(i).getDimension().getNumberOfChild() > 0) {
           addChildrenBlockIndex(dimensionBlockIndex, queryDimensions.get(i).getDimension());
         }
       }
@@ -255,7 +255,7 @@ public class QueryUtil {
    * @param dimension    parent dimension
    */
   private static void addChildrenBlockIndex(Set<Integer> blockIndexes, CarbonDimension dimension) {
-    for (int i = 0; i < dimension.numberOfChild(); i++) {
+    for (int i = 0; i < dimension.getNumberOfChild(); i++) {
       addChildrenBlockIndex(blockIndexes, dimension.getListOfChildDimensions().get(i));
       blockIndexes.add(dimension.getListOfChildDimensions().get(i).getOrdinal());
     }
@@ -289,10 +289,10 @@ public class QueryUtil {
           .hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY) && !CarbonUtil
           .hasEncoding(encodingList, Encoding.IMPLICIT)) {
 
-        if (queryDimensions.get(i).getDimension().numberOfChild() == 0) {
+        if (queryDimensions.get(i).getDimension().getNumberOfChild() == 0) {
           dictionaryDimensionFromQuery.add(queryDimensions.get(i).getDimension().getColumnId());
         }
-        if (queryDimensions.get(i).getDimension().numberOfChild() > 0) {
+        if (queryDimensions.get(i).getDimension().getNumberOfChild() > 0) {
           getChildDimensionDictionaryDetail(queryDimensions.get(i).getDimension(),
               dictionaryDimensionFromQuery);
         }
@@ -318,9 +318,9 @@ public class QueryUtil {
    */
   private static void getChildDimensionDictionaryDetail(CarbonDimension queryDimensions,
       Set<String> dictionaryDimensionFromQuery) {
-    for (int j = 0; j < queryDimensions.numberOfChild(); j++) {
+    for (int j = 0; j < queryDimensions.getNumberOfChild(); j++) {
       List<Encoding> encodingList = queryDimensions.getListOfChildDimensions().get(j).getEncoder();
-      if (queryDimensions.getListOfChildDimensions().get(j).numberOfChild() > 0) {
+      if (queryDimensions.getListOfChildDimensions().get(j).getNumberOfChild() > 0) {
         getChildDimensionDictionaryDetail(queryDimensions.getListOfChildDimensions().get(j),
             dictionaryDimensionFromQuery);
       } else if (!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {
@@ -610,12 +610,12 @@ public class QueryUtil {
       Set<Integer> dictionaryDimensionBlockIndex, List<Integer> noDictionaryDimensionBlockIndex) {
     for (QueryDimension queryDimension : queryDimensions) {
       if (CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.DICTIONARY)
-          && queryDimension.getDimension().numberOfChild() == 0) {
+          && queryDimension.getDimension().getNumberOfChild() == 0) {
         dictionaryDimensionBlockIndex
             .add(columnOrdinalToBlockIndexMapping.get(queryDimension.getDimension().getOrdinal()));
       } else if (
           !CarbonUtil.hasEncoding(queryDimension.getDimension().getEncoder(), Encoding.IMPLICIT)
-              && queryDimension.getDimension().numberOfChild() == 0) {
+              && queryDimension.getDimension().getNumberOfChild() == 0) {
         noDictionaryDimensionBlockIndex
             .add(columnOrdinalToBlockIndexMapping.get(queryDimension.getDimension().getOrdinal()));
       }
@@ -874,9 +874,9 @@ public class QueryUtil {
    */
   private static void getChildDimensionOrdinal(CarbonDimension queryDimensions,
       Set<Integer> filterDimensionsOrdinal) {
-    for (int j = 0; j < queryDimensions.numberOfChild(); j++) {
+    for (int j = 0; j < queryDimensions.getNumberOfChild(); j++) {
       List<Encoding> encodingList = queryDimensions.getListOfChildDimensions().get(j).getEncoder();
-      if (queryDimensions.getListOfChildDimensions().get(j).numberOfChild() > 0) {
+      if (queryDimensions.getListOfChildDimensions().get(j).getNumberOfChild() > 0) {
         getChildDimensionOrdinal(queryDimensions.getListOfChildDimensions().get(j),
             filterDimensionsOrdinal);
       } else if (!CarbonUtil.hasEncoding(encodingList, Encoding.DIRECT_DICTIONARY)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index ab0ed55..e035970 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -253,11 +253,11 @@ public class RestructureUtil {
     for (CarbonDimension queryDimension : queryDimensions) {
       // if number of child is zero, then it is not a complex dimension
       // so directly add it query dimension
-      if (queryDimension.numberOfChild() == 0) {
+      if (queryDimension.getNumberOfChild() == 0) {
         updatedQueryDimension.add(queryDimension);
       }
       // if number of child is more than 1 then add all its children
-      numberOfChildren = queryDimension.getOrdinal() + queryDimension.numberOfChild();
+      numberOfChildren = queryDimension.getOrdinal() + queryDimension.getNumberOfChild();
       for (int j = queryDimension.getOrdinal(); j < numberOfChildren; j++) {
         updatedQueryDimension.add(tableBlockDimensions.get(j));
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index bf85e77..97b1a1f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -340,7 +340,7 @@ public abstract class AbstractDataFileFooterConverter {
       List<ValueEncoderMeta> encodeMetaList =
           new ArrayList<ValueEncoderMeta>(thriftEncoderMeta.size());
       for (int i = 0; i < thriftEncoderMeta.size(); i++) {
-        encodeMetaList.add(CarbonUtil.deserializeEncoderMeta(thriftEncoderMeta.get(i).array()));
+        encodeMetaList.add(CarbonUtil.deserializeEncoderMetaV2(thriftEncoderMeta.get(i).array()));
       }
       dataChunk.setValueEncoderMeta(encodeMetaList);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
index caba75f..aee4f54 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ByteUtil.java
@@ -670,4 +670,22 @@ public final class ByteUtil {
     System.arraycopy(srcBytes, srcOffset, tgtBytes, tgtOffset, srcLength);
     return tgtOffset + srcLength;
   }
+
+  /**
+   * flatten input byte[][] to byte[] and return
+   */
+  public static byte[] flatten(byte[][] input) {
+    int totalSize = 0;
+    for (int i = 0; i < input.length; i++) {
+      totalSize += input[i].length;
+    }
+    byte[] flattenedData = new byte[totalSize];
+    int pos = 0;
+    for (int i = 0; i < input.length; i++) {
+      System.arraycopy(input[i], 0, flattenedData, pos, input[i].length);
+      pos += input[i].length;
+    }
+    return flattenedData;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 6fe38e2..e89ce12 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -21,7 +21,6 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
-import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
@@ -32,7 +31,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -122,12 +121,11 @@ public class CarbonMetadataUtil {
    * @param blockletIndexs
    * @param cardinalities
    * @param numberOfColumns
-   * @param segmentProperties
    * @return FileFooter
    */
   public static FileFooter3 convertFileFooterVersion3(List<BlockletInfo3> infoList,
-      List<BlockletIndex> blockletIndexs, int[] cardinalities, int numberOfColumns,
-      SegmentProperties segmentProperties) throws IOException {
+      List<BlockletIndex> blockletIndexs, int[] cardinalities, int numberOfColumns)
+      throws IOException {
     FileFooter3 footer = getFileFooter3(infoList, blockletIndexs, cardinalities, numberOfColumns);
     for (BlockletInfo3 info : infoList) {
       footer.addToBlocklet_info_list3(info);
@@ -249,11 +247,11 @@ public class CarbonMetadataUtil {
       List<CarbonMeasure> carbonMeasureList) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
     // Calculating min/max for every each column.
-    byte[][] minCol = nodeHolderList.get(0).getColumnMinData().clone();
-    byte[][] maxCol = nodeHolderList.get(0).getColumnMaxData().clone();
+    byte[][] minCol = nodeHolderList.get(0).getDimensionColumnMinData().clone();
+    byte[][] maxCol = nodeHolderList.get(0).getDimensionColumnMaxData().clone();
     for (NodeHolder nodeHolder : nodeHolderList) {
-      byte[][] columnMaxData = nodeHolder.getColumnMaxData();
-      byte[][] columnMinData = nodeHolder.getColumnMinData();
+      byte[][] columnMaxData = nodeHolder.getDimensionColumnMaxData();
+      byte[][] columnMinData = nodeHolder.getDimensionColumnMinData();
       for (int i = 0; i < maxCol.length; i++) {
         if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(columnMaxData[i], maxCol[i]) > 0) {
           maxCol[i] = columnMaxData[i];
@@ -402,7 +400,7 @@ public class CarbonMetadataUtil {
       // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
-          createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
+          createValueEncoderMeta(blockletInfoColumnar.getStats(), i))));
       dataChunk.setEncoder_meta(encoderMetaList);
       colDataChunks.add(dataChunk);
     }
@@ -442,18 +440,33 @@ public class CarbonMetadataUtil {
     return aos.toByteArray();
   }
 
-  private static ValueEncoderMeta createValueEncoderMeta(WriterCompressModel compressionModel,
+  private static ValueEncoderMeta createValueEncoderMeta(MeasurePageStatsVO stats,
       int index) {
     ValueEncoderMeta encoderMeta = new ValueEncoderMeta();
-    encoderMeta.setMaxValue(compressionModel.getMaxValue()[index]);
-    encoderMeta.setMinValue(compressionModel.getMinValue()[index]);
-    encoderMeta.setDataTypeSelected(compressionModel.getDataTypeSelected()[index]);
-    encoderMeta.setDecimal(compressionModel.getMantissa()[index]);
-    encoderMeta.setType(compressionModel.getType()[index]);
-    encoderMeta.setUniqueValue(compressionModel.getUniqueValue()[index]);
+    encoderMeta.setMaxValue(stats.getMax(index));
+    encoderMeta.setMinValue(stats.getMin(index));
+    encoderMeta.setDataTypeSelected(stats.getDataTypeSelected(index));
+    encoderMeta.setDecimal(stats.getDecimal(index));
+    encoderMeta.setType(getTypeInChar(stats.getDataType(index)));
+    encoderMeta.setUniqueValue(stats.getNonExistValue(index));
     return encoderMeta;
   }
 
+  private static char getTypeInChar(DataType type) {
+    switch (type) {
+      case SHORT:
+      case INT:
+      case LONG:
+        return CarbonCommonConstants.BIG_INT_MEASURE;
+      case DOUBLE:
+        return CarbonCommonConstants.DOUBLE_MEASURE;
+      case DECIMAL:
+        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
+      default:
+        throw new RuntimeException("unsupported type: " + type);
+    }
+  }
+
   /**
    * Right now it is set to default values. We may use this in future
    */
@@ -527,7 +540,7 @@ public class CarbonMetadataUtil {
       }
       blockletInfoColumnar.setMeasureLength(msrLens);
       blockletInfoColumnar.setMeasureOffset(msrOffsets);
-      blockletInfoColumnar.setCompressionModel(getValueCompressionModel(encoderMetas));
+      blockletInfoColumnar.setStats(getMeasurePageStats(encoderMetas));
       listOfNodeInfo.add(blockletInfoColumnar);
     }
 
@@ -549,24 +562,8 @@ public class CarbonMetadataUtil {
 
   }
 
-  private static WriterCompressModel getValueCompressionModel(ValueEncoderMeta[] encoderMetas) {
-    Object[] maxValue = new Object[encoderMetas.length];
-    Object[] minValue = new Object[encoderMetas.length];
-    int[] decimalLength = new int[encoderMetas.length];
-    Object[] uniqueValue = new Object[encoderMetas.length];
-    DataType[] aggType = new DataType[encoderMetas.length];
-    byte[] dataTypeSelected = new byte[encoderMetas.length];
-    for (int i = 0; i < encoderMetas.length; i++) {
-      maxValue[i] = encoderMetas[i].getMaxValue();
-      minValue[i] = encoderMetas[i].getMinValue();
-      decimalLength[i] = encoderMetas[i].getDecimal();
-      uniqueValue[i] = encoderMetas[i].getUniqueValue();
-      aggType[i] = encoderMetas[i].getType();
-      dataTypeSelected[i] = encoderMetas[i].getDataTypeSelected();
-    }
-    return ValueCompressionUtil
-        .getWriterCompressModel(maxValue, minValue, decimalLength, uniqueValue, aggType,
-            dataTypeSelected);
+  private static MeasurePageStatsVO getMeasurePageStats(ValueEncoderMeta[] encoderMetas) {
+    return MeasurePageStatsVO.build(encoderMetas);
   }
 
   private static void setBlockletIndex(FileFooter footer,
@@ -714,7 +711,7 @@ public class CarbonMetadataUtil {
       // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
       encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
-          createValueEncoderMeta(blockletInfoColumnar.getCompressionModel(), i))));
+          createValueEncoderMeta(blockletInfoColumnar.getStats(), i))));
       dataChunk.setEncoder_meta(encoderMetaList);
       colDataChunks.add(dataChunk);
     }
@@ -754,7 +751,7 @@ public class CarbonMetadataUtil {
         dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
         // TODO : Once schema PR is merged and information needs to be passed
         // here.
-        if (nodeHolder.getAggBlocks()[index]) {
+        if (nodeHolder.getRleEncodingForDictDim()[index]) {
           dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
           encodings.add(Encoding.RLE);
         }
@@ -766,8 +763,10 @@ public class CarbonMetadataUtil {
           dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
           encodings.add(Encoding.INVERTED_INDEX);
         }
-        dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[index]));
-        dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[index]));
+        dataChunk.min_max.addToMax_values(
+            ByteBuffer.wrap(nodeHolder.getDimensionColumnMaxData()[index]));
+        dataChunk.min_max.addToMin_values(
+            ByteBuffer.wrap(nodeHolder.getDimensionColumnMinData()[index]));
       } else {
         dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
         // TODO : Right now the encodings are happening at runtime. change as
@@ -788,7 +787,7 @@ public class CarbonMetadataUtil {
         dataChunk.setPresence(presenceMeta);
         List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
         encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
-            createValueEncoderMeta(nodeHolder.getCompressionModel(), index))));
+            createValueEncoderMeta(nodeHolder.getStats(), index))));
         dataChunk.setEncoder_meta(encoderMetaList);
         dataChunk.min_max
             .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
@@ -857,28 +856,6 @@ public class CarbonMetadataUtil {
     return buffer.array();
   }
 
-  public static byte[] getByteValueForMeasure(Object data, DataType dataType) {
-    ByteBuffer b = null;
-    switch (dataType) {
-      case DOUBLE:
-        b = ByteBuffer.allocate(8);
-        b.putDouble((Double) data);
-        b.flip();
-        return b.array();
-      case LONG:
-      case INT:
-      case SHORT:
-        b = ByteBuffer.allocate(8);
-        b.putLong((Long) data);
-        b.flip();
-        return b.array();
-      case DECIMAL:
-        return DataTypeUtil.bigDecimalToByte((BigDecimal) data);
-      default:
-        throw new IllegalArgumentException("Invalid data type");
-    }
-  }
-
   public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
     ByteBuffer firstBuffer = null;
     ByteBuffer secondBuffer = null;
@@ -956,7 +933,7 @@ public class CarbonMetadataUtil {
           encodings.add(Encoding.DIRECT_DICTIONARY);
         }
         dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[i]);
-        if (nodeHolder.getAggBlocks()[i]) {
+        if (nodeHolder.getRleEncodingForDictDim()[i]) {
           dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[i]);
           encodings.add(Encoding.RLE);
         }
@@ -966,8 +943,10 @@ public class CarbonMetadataUtil {
           dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[i]);
           encodings.add(Encoding.INVERTED_INDEX);
         }
-        dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getColumnMaxData()[i]));
-        dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getColumnMinData()[i]));
+        dataChunk.min_max.addToMax_values(
+            ByteBuffer.wrap(nodeHolder.getDimensionColumnMaxData()[i]));
+        dataChunk.min_max.addToMin_values(
+            ByteBuffer.wrap(nodeHolder.getDimensionColumnMinData()[i]));
         dataChunk.setEncoders(encodings);
         dataChunkBuffer.add(CarbonUtil.getByteArray(dataChunk));
       }
@@ -996,7 +975,7 @@ public class CarbonMetadataUtil {
         dataChunk.setPresence(presenceMeta);
         List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
         encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
-            createValueEncoderMeta(nodeHolder.getCompressionModel(), i))));
+            createValueEncoderMeta(nodeHolder.getStats(), i))));
         dataChunk.setEncoder_meta(encoderMetaList);
         dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[i]));
         dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[i]));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 4874f78..1a00fd5 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -48,10 +48,9 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.MeasureRawColumnChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
-import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -822,36 +821,12 @@ public final class CarbonUtil {
   }
 
   /**
-   * Below method will be used to get the value compression model of the
-   * measure data chunk
-   *
-   * @return value compression model
+   * Below method will be used to get the stats of the measure data page
    */
-  public static WriterCompressModel getValueCompressionModel(
+  public static MeasurePageStatsVO getMeasurePageStats(
       List<ValueEncoderMeta> encodeMetaList) {
-    Object[] maxValue = new Object[encodeMetaList.size()];
-    Object[] minValue = new Object[encodeMetaList.size()];
-    Object[] uniqueValue = new Object[encodeMetaList.size()];
-    int[] decimal = new int[encodeMetaList.size()];
-    DataType[] type = new DataType[encodeMetaList.size()];
-    byte[] dataTypeSelected = new byte[encodeMetaList.size()];
-
-    /*
-     * to fill the meta data required for value compression model
-     */
-    for (int i = 0; i < dataTypeSelected.length; i++) {  // always 1
-      ValueEncoderMeta valueEncoderMeta = encodeMetaList.get(i);
-      maxValue[i] = valueEncoderMeta.getMaxValue();
-      minValue[i] = valueEncoderMeta.getMinValue();
-      uniqueValue[i] = valueEncoderMeta.getUniqueValue();
-      decimal[i] = valueEncoderMeta.getDecimal();
-      type[i] = valueEncoderMeta.getType();
-      dataTypeSelected[i] = valueEncoderMeta.getDataTypeSelected();
-    }
-    MeasureMetaDataModel measureMetadataModel =
-        new MeasureMetaDataModel(minValue, maxValue, decimal, dataTypeSelected.length, uniqueValue,
-            type, dataTypeSelected);
-    return ValueCompressionUtil.getWriterCompressModel(measureMetadataModel);
+    return MeasurePageStatsVO.build(
+        encodeMetaList.toArray(new ValueEncoderMeta[encodeMetaList.size()]));
   }
 
   /**
@@ -1045,8 +1020,8 @@ public final class CarbonUtil {
       if (null != childs && childs.size() > 0) {
         break;
       }
-      if (carbonDimension.isColumnar() && hasEncoding(carbonDimension.getEncoder(),
-          Encoding.DICTIONARY)) {
+      if (carbonDimension.isColumnar() &&
+          hasEncoding(carbonDimension.getEncoder(), Encoding.DICTIONARY)) {
         isDictionaryDimensions.add(true);
       } else if (!carbonDimension.isColumnar()) {
         if (processedColumnGroup.add(carbonDimension.columnGroupId())) {
@@ -1420,7 +1395,7 @@ public final class CarbonUtil {
    * @param encoderMeta
    * @return ValueEncoderMeta object
    */
-  public static ValueEncoderMeta deserializeEncoderMeta(byte[] encoderMeta) {
+  public static ValueEncoderMeta deserializeEncoderMetaV2(byte[] encoderMeta) {
     // TODO : should remove the unnecessary fields.
     ByteArrayInputStream aos = null;
     ObjectInputStream objStream = null;
@@ -1437,7 +1412,7 @@ public final class CarbonUtil {
     return meta;
   }
 
-  public static ValueEncoderMeta deserializeEncoderMetaNew(byte[] encodeMeta) {
+  public static ValueEncoderMeta deserializeEncoderMetaV3(byte[] encodeMeta) {
     ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
     char measureType = buffer.getChar();
     ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index a37a9a7..95037b2 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.core.util;
 
 import java.util.BitSet;
 
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 
 public class NodeHolder {
   /**
@@ -83,26 +83,23 @@ public class NodeHolder {
   /**
    * column max data
    */
-  private byte[][] columnMaxData;
+  private byte[][] dimensionColumnMaxData;
 
   /**
    * column min data
    */
-  private byte[][] columnMinData;
+  private byte[][] dimensionColumnMinData;
 
   private byte[][] measureColumnMaxData;
 
   private byte[][] measureColumnMinData;
 
-  /**
-   * compression model for numbers data block.
-   */
-  private WriterCompressModel compressionModel;
+  private MeasurePageStatsVO stats;
 
   /**
-   * array of aggBlocks flag to identify the aggBlocks
+   * array of rleEncodingForDictDim flag to identify the rleEncodingForDictDim
    */
-  private boolean[] aggBlocks;
+  private boolean[] rleEncodingForDictDim;
 
   /**
    * true if given index is colgroup block
@@ -312,46 +309,38 @@ public class NodeHolder {
     this.dataIndexMapLength = dataIndexMapLength;
   }
 
-  public byte[][] getColumnMaxData() {
-    return this.columnMaxData;
-  }
-
-  public void setColumnMaxData(byte[][] columnMaxData) {
-    this.columnMaxData = columnMaxData;
-  }
-
-  public byte[][] getColumnMinData() {
-    return this.columnMinData;
+  public byte[][] getDimensionColumnMaxData() {
+    return this.dimensionColumnMaxData;
   }
 
-  public void setColumnMinData(byte[][] columnMinData) {
-    this.columnMinData = columnMinData;
+  public void setDimensionColumnMaxData(byte[][] columnMaxData) {
+    this.dimensionColumnMaxData = columnMaxData;
   }
 
-  public WriterCompressModel getCompressionModel() {
-    return compressionModel;
+  public byte[][] getDimensionColumnMinData() {
+    return this.dimensionColumnMinData;
   }
 
-  public void setCompressionModel(WriterCompressModel compressionModel) {
-    this.compressionModel = compressionModel;
+  public void setDimensionColumnMinData(byte[][] columnMinData) {
+    this.dimensionColumnMinData = columnMinData;
   }
 
   /**
-   * returns array of aggBlocks flag to identify the aag blocks
+   * returns array of rleEncodingForDictDim flag to identify the aag blocks
    *
    * @return
    */
-  public boolean[] getAggBlocks() {
-    return aggBlocks;
+  public boolean[] getRleEncodingForDictDim() {
+    return rleEncodingForDictDim;
   }
 
   /**
-   * set array of aggBlocks flag to identify the aggBlocks
+   * set array of rleEncodingForDictDim flag to identify the rleEncodingForDictDim
    *
-   * @param aggBlocks
+   * @param rleEncodingForDictDim
    */
-  public void setAggBlocks(boolean[] aggBlocks) {
-    this.aggBlocks = aggBlocks;
+  public void setRleEncodingForDictDim(boolean[] rleEncodingForDictDim) {
+    this.rleEncodingForDictDim = rleEncodingForDictDim;
   }
 
   /**
@@ -428,4 +417,12 @@ public class NodeHolder {
   public boolean isWriteAll() {
     return this.writeAll;
   }
+
+  public MeasurePageStatsVO getStats() {
+    return stats;
+  }
+
+  public void setMeasureStats(MeasurePageStatsVO stats) {
+    this.stats = stats;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
index 5020acb..f5c7640 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/ValueCompressionUtil.java
@@ -24,10 +24,8 @@ import java.util.Arrays;
 import org.apache.carbondata.core.compression.BigIntCompressor;
 import org.apache.carbondata.core.compression.DoubleCompressor;
 import org.apache.carbondata.core.compression.ValueCompressor;
-import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
 import org.apache.carbondata.core.datastore.compression.ReaderCompressModel;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.compression.decimal.CompressByteArray;
 import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinByte;
 import org.apache.carbondata.core.datastore.compression.decimal.CompressionMaxMinDefault;
@@ -195,7 +193,8 @@ public final class ValueCompressionUtil {
 
   private static CompressionFinder getLongCompressorFinder(Object maxValue, Object minValue,
       int mantissa, byte dataTypeSelected, DataType measureStoreType) {
-    DataType adaptiveDataType = getDataType((long) maxValue, mantissa, dataTypeSelected);
+    long value = Math.max(Math.abs((long)maxValue), Math.abs((long)minValue));
+    DataType adaptiveDataType = getDataType(value, mantissa, dataTypeSelected);
     int adaptiveSize = getSize(adaptiveDataType);
     DataType deltaDataType = null;
     // we cannot apply compression in case actual data type of the column is long
@@ -264,7 +263,7 @@ public final class ValueCompressionUtil {
    * @param compressionFinders : Compression types for measures
    * @return
    */
-  private static ValueCompressionHolder[] getValueCompressionHolder(
+  public static ValueCompressionHolder[] getValueCompressionHolder(
       CompressionFinder[] compressionFinders) {
     ValueCompressionHolder[] valueCompressionHolders =
         new ValueCompressionHolder[compressionFinders.length];
@@ -658,55 +657,6 @@ public final class ValueCompressionUtil {
   }
 
   /**
-   * Create Value compression model for write path
-   */
-  public static WriterCompressModel getWriterCompressModel(Object[] maxValue, Object[] minValue,
-      int[] mantissa, Object[] uniqueValue, DataType[] dataType, byte[] dataTypeSelected) {
-    MeasureMetaDataModel metaDataModel =
-        new MeasureMetaDataModel(minValue, maxValue, mantissa, maxValue.length, uniqueValue,
-            dataType, dataTypeSelected);
-    return getWriterCompressModel(metaDataModel);
-  }
-
-  /**
-   * Create Value compression model for write path
-   */
-  public static WriterCompressModel getWriterCompressModel(MeasureMetaDataModel measureMDMdl) {
-    int measureCount = measureMDMdl.getMeasureCount();
-    Object[] minValue = measureMDMdl.getMinValue();
-    Object[] maxValue = measureMDMdl.getMaxValue();
-    Object[] uniqueValue = measureMDMdl.getUniqueValue();
-    int[] mantissa = measureMDMdl.getMantissa();
-    DataType[] type = measureMDMdl.getType();
-    byte[] dataTypeSelected = measureMDMdl.getDataTypeSelected();
-    WriterCompressModel compressionModel = new WriterCompressModel();
-    DataType[] actualType = new DataType[measureCount];
-    DataType[] convertedType = new DataType[measureCount];
-    CompressionFinder[] compressionFinders = new CompressionFinder[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      CompressionFinder compresssionFinder =
-          ValueCompressionUtil.getCompressionFinder(maxValue[i],
-              minValue[i], mantissa[i], type[i], dataTypeSelected[i]);
-      compressionFinders[i] = compresssionFinder;
-      actualType[i] = compresssionFinder.getActualDataType();
-      convertedType[i] = compresssionFinder.getConvertedDataType();
-    }
-    compressionModel.setCompressionFinders(compressionFinders);
-    compressionModel.setMaxValue(maxValue);
-    compressionModel.setMantissa(mantissa);
-    compressionModel.setConvertedDataType(convertedType);
-    compressionModel.setActualDataType(actualType);
-    compressionModel.setMinValue(minValue);
-    compressionModel.setUniqueValue(uniqueValue);
-    compressionModel.setType(type);
-    compressionModel.setDataTypeSelected(dataTypeSelected);
-    ValueCompressionHolder[] values = ValueCompressionUtil
-        .getValueCompressionHolder(compressionFinders);
-    compressionModel.setValueCompressionHolder(values);
-    return compressionModel;
-  }
-
-  /**
    * Create Value compression model for read path
    */
   public static ReaderCompressModel getReaderCompressModel(ValueEncoderMeta meta) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
deleted file mode 100644
index ad10d43..0000000
--- a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/CompressedDimensionChunkFileBasedReaderTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.chunk.reader.dimension;
-
-
-public class CompressedDimensionChunkFileBasedReaderTest {
-
-//  static CompressedDimensionChunkFileBasedReaderV1 compressedDimensionChunkFileBasedReader;
-//  static List<DataChunk> dataChunkList;
-//
-//  @BeforeClass public static void setup() {
-//    int eachColumnBlockSize[] = { 1, 2, 4, 5 };
-//    dataChunkList = new ArrayList<>();
-//
-//    DataChunk dataChunk = new DataChunk();
-//    dataChunkList.add(dataChunk);
-//    BlockletInfo info = new BlockletInfo();
-//    info.setDimensionColumnChunk(dataChunkList);
-//    compressedDimensionChunkFileBasedReader =
-//        new CompressedDimensionChunkFileBasedReaderV1(info, eachColumnBlockSize, "filePath");
-//  }
-//
-//  @Test public void readDimensionChunksTest() {
-//    FileHolder fileHolder = new MockUp<FileHolder>() {
-//      @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    }.getMockInstance();
-//
-//    new MockUp<CarbonUtil>() {
-//      @Mock public boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
-//        return true;
-//      }
-//
-//      @Mock public int[] getUnCompressColumnIndex(int totalLength, byte[] columnIndexData,
-//          NumberCompressor numberCompressor,int offset) {
-//        int mockedValue[] = { 1, 1 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    new MockUp<SnappyCompressor>() {
-//      @Mock public byte[] unCompressByte(byte[] compInput) {
-//        byte mockedValue[] = { 1 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    new MockUp<UnBlockIndexer>() {
-//      @Mock public byte[] uncompressData(byte[] data, int[] index, int keyLen) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    int[][] blockIndexes = {{0,0}};
-//    DimensionColumnDataChunk dimensionColumnDataChunk[] =
-//        compressedDimensionChunkFileBasedReader.readDimensionChunks(fileHolder, blockIndexes);
-//    byte expectedResult[] = { 1 };
-//    assertEquals(dimensionColumnDataChunk[0].getColumnValueSize(), 1);
-//    for (int i = 0; i < dimensionColumnDataChunk[0].getChunkData(0).length; i++) {
-//      assertEquals(dimensionColumnDataChunk[0].getChunkData(0)[i], expectedResult[i]);
-//    }
-//  }
-//
-//  @Test public void readDimensionChunksTestForIfStatement() {
-//    FileHolder fileHolder = new MockUp<FileHolder>() {
-//      @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    }.getMockInstance();
-//
-//    new MockUp<CarbonUtil>() {
-//      @Mock public boolean hasEncoding(List<Encoding> encodings, Encoding encoding) {
-//        return true;
-//      }
-//
-//      @Mock public int[] getUnCompressColumnIndex(int totalLength, byte[] columnIndexData,
-//          NumberCompressor numberCompressor, int offset) {
-//        int mockedValue[] = { 1, 1 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    new MockUp<SnappyCompressor>() {
-//      @Mock public byte[] unCompressByte(byte[] compInput) {
-//        byte mockedValue[] = { 1 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    new MockUp<UnBlockIndexer>() {
-//      @Mock public byte[] uncompressData(byte[] data, int[] index, int keyLen) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    };
-//
-//    new MockUp<DataChunk>() {
-//      @Mock public boolean isRowMajor() {
-//        return true;
-//      }
-//    };
-//    int[][] blockIndexes = {{0,0}};
-//    DimensionColumnDataChunk dimensionColumnDataChunk[] =
-//        compressedDimensionChunkFileBasedReader.readDimensionChunks(fileHolder, blockIndexes);
-//
-//    byte expectedResult[] = { 1 };
-//    assertEquals(dimensionColumnDataChunk[0].getColumnValueSize(), 1);
-//
-//    for (int i = 0; i < dimensionColumnDataChunk[0].getChunkData(0).length; i++) {
-//      assertEquals(dimensionColumnDataChunk[0].getChunkData(0)[i], expectedResult[i]);
-//    }
-//  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
deleted file mode 100644
index e4257f6..0000000
--- a/core/src/test/java/org/apache/carbondata/core/datastore/chunk/reader/measure/CompressedMeasureChunkFileBasedReaderTest.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.core.datastore.chunk.reader.measure;
-
-
-public class CompressedMeasureChunkFileBasedReaderTest {
-
-//  static CompressedMeasureChunkFileBasedReaderV1 compressedMeasureChunkFileBasedReader;
-//  static CarbonWriteDataHolder[] dataHolder = new CarbonWriteDataHolder[1];
-//
-//  static WriterCompressModel writerCompressModel;
-//  @BeforeClass public static void setup() {
-//    List<DataChunk> dataChunkList = new ArrayList<>();
-//    DataChunk dataChunk = new DataChunk();
-//    dataChunkList.add(dataChunk);
-//    dataChunk.setDataPageLength(10);
-//    writerCompressModel = new WriterCompressModel();
-//    Object maxValue[] = new Object[]{new Long[]{8L, 0L}};
-//    Object minValue[] = new Object[]{new Long[]{1L,0L}};
-//    byte[] dataTypeSelected = new byte[1];
-//    char[] aggType = new char[]{'b'};
-//    MeasureMetaDataModel measureMDMdl =
-//                new MeasureMetaDataModel(minValue, maxValue, new int[]{1}, maxValue.length, null,
-//                    aggType, dataTypeSelected);
-//    writerCompressModel = ValueCompressionUtil.getWriterCompressModel(measureMDMdl);
-//    
-//
-//    ValueEncoderMeta meta = new ValueEncoderMeta();
-//    meta.setMaxValue(new Long[]{8L,0L});
-//    meta.setMinValue(new Long[]{1L,0L});
-//    meta.setMantissa(1);
-//    meta.setType('b');
-//    List<ValueEncoderMeta> valueEncoderMetaList = new ArrayList<>();
-//    valueEncoderMetaList.add(meta);
-//    dataChunkList.get(0).setValueEncoderMeta(valueEncoderMetaList);
-//    BlockletInfo info = new BlockletInfo();
-//    info.setMeasureColumnChunk(dataChunkList);
-//    compressedMeasureChunkFileBasedReader =
-//        new CompressedMeasureChunkFileBasedReaderV1(info, "filePath");
-//  }
-//
-//  @Test public void readMeasureChunkTest() throws IOException {
-//    FileHolder fileHolder = new MockUp<FileHolder>() {
-//      @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    }.getMockInstance();
-//
-//    MeasureColumnDataChunk measureColumnDataChunks =
-//        compressedMeasureChunkFileBasedReader.readMeasureChunk(fileHolder, 0);
-//
-//    BigDecimal bigD = new BigDecimal("2.1");
-//    assertEquals(bigD,
-//        measureColumnDataChunks.getMeasureDataHolder().getReadableBigDecimalValueByIndex(0));
-//      
-//  }
-//
-//  @Test public void readMeasureChunksTest() throws IOException {
-//    FileHolder fileHolder = new MockUp<FileHolder>() {
-//      @Mock public byte[] readByteArray(String filePath, long offset, int length) {
-//        byte mockedValue[] = { 1, 5, 4, 8, 7 };
-//        return mockedValue;
-//      }
-//    }.getMockInstance();
-//
-//    int[][] blockIndexes = {{0,0}};
-//    MeasureColumnDataChunk measureColumnDataChunks[] =
-//        compressedMeasureChunkFileBasedReader.readMeasureChunks(fileHolder, blockIndexes);
-//
-//    BigDecimal bigD = new BigDecimal("2.1");
-//    assertEquals(bigD,
-//        measureColumnDataChunks[0].getMeasureDataHolder().getReadableBigDecimalValueByIndex(0));
-//
-//  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
index 0a5c9fa..56d14c2 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/CarbonMetadataTest.java
@@ -209,7 +209,7 @@ public class CarbonMetadataTest {
     };
 
     new MockUp<CarbonDimension>() {
-      @Mock public int numberOfChild() {
+      @Mock public int getNumberOfChild() {
         return 1;
       }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index ddcc8a4..56e83db 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -20,9 +20,10 @@ package org.apache.carbondata.core.util;
 import mockit.Mock;
 import mockit.MockUp;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.format.*;
@@ -53,6 +54,7 @@ public class CarbonMetadataUtilTest {
   static List<ColumnSchema> columnSchemaList;
   static Long[] objMaxArr;
   static Long[] objMinArr;
+  static int[] objDecimal;
 
   @BeforeClass public static void setUp() {
     Long lngObj = new Long("11221");
@@ -73,6 +75,8 @@ public class CarbonMetadataUtilTest {
     objMinArr[4] = new Long("151");
     objMinArr[5] = new Long("161");
 
+    objDecimal = new int[] { 0, 0, 0, 0, 0, 0 };
+
     columnSchemaList = new ArrayList<>();
     List<Encoding> encodingList = new ArrayList<>();
     encodingList.add(Encoding.BIT_PACKED);
@@ -193,13 +197,18 @@ public class CarbonMetadataUtilTest {
     integerList.add(new Integer("1"));
     integerList.add(new Integer("2"));
 
-    WriterCompressModel writerCompressModel = new WriterCompressModel();
-    writerCompressModel.setMaxValue(objMaxArr);
-    writerCompressModel.setMinValue(objMinArr);
-    writerCompressModel.setDataTypeSelected(byteArr);
-    writerCompressModel.setMantissa(intArr);
-    writerCompressModel.setType(dataType);
-    writerCompressModel.setUniqueValue(objMinArr);
+    ValueEncoderMeta[] metas = new ValueEncoderMeta[6];
+    for (int i = 0; i < metas.length; i++) {
+      metas[i] = new ValueEncoderMeta();
+      metas[i].setMinValue(objMinArr[i]);
+      metas[i].setMaxValue(objMaxArr[i]);
+      metas[i].setUniqueValue(objMinArr[i]);
+      metas[i].setDecimal(objDecimal[i]);
+      metas[i].setType(CarbonCommonConstants.BIG_INT_MEASURE);
+      metas[i].setDataTypeSelected(byteArr[i]);
+    }
+
+    MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas);
 
     BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();
 
@@ -223,7 +232,7 @@ public class CarbonMetadataUtilTest {
     blockletInfoColumnar.setMeasureLength(intArr);
     blockletInfoColumnar.setMeasureOffset(longArr);
     blockletInfoColumnar.setMeasureNullValueIndex(bitSetArr);
-    blockletInfoColumnar.setCompressionModel(writerCompressModel);
+    blockletInfoColumnar.setStats(stats);
 
     BlockletInfoColumnar blockletInfoColumnar1 = new BlockletInfoColumnar();
     blockletInfoColumnar1.setColumnMaxData(maxByteArr);
@@ -239,7 +248,7 @@ public class CarbonMetadataUtilTest {
     blockletInfoColumnar1.setMeasureLength(intArr);
     blockletInfoColumnar1.setMeasureOffset(longArr);
     blockletInfoColumnar1.setMeasureNullValueIndex(bitSetArr);
-    blockletInfoColumnar1.setCompressionModel(writerCompressModel);
+    blockletInfoColumnar1.setStats(stats);
     blockletInfoColumnar1.setColGrpBlocks(boolArr);
 
     List<BlockletInfoColumnar> blockletInfoColumnarList = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index 9adf4d4..082be3c 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -31,7 +31,6 @@ import java.util.List;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionDataChunk;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -661,9 +660,7 @@ public class CarbonUtilTest {
     valueEncoderMetas.add(valueEncoderMeta);
     dataChunk.setValueEncoderMeta(valueEncoderMetas);
     dataChunkList.add(dataChunk);
-    WriterCompressModel writerCompressModel =
-        CarbonUtil.getValueCompressionModel(dataChunkList.get(0).getValueEncoderMeta());
-    assertEquals(1, writerCompressModel.getMaxValue().length);
+    assertEquals(1, dataChunkList.get(0).getValueEncoderMeta().size());
   }
 
   @Test public void testToGetDictionaryChunkSize() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
index 3032085..629d617 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/ValueCompressionUtilTest.java
@@ -21,8 +21,6 @@ import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 
-import org.apache.carbondata.core.datastore.compression.MeasureMetaDataModel;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
 import org.apache.carbondata.core.datastore.compression.decimal.*;
 import org.apache.carbondata.core.datastore.compression.nondecimal.*;
@@ -485,136 +483,4 @@ public class ValueCompressionUtilTest {
     }
   }
 
-  @Test public void testToGetValueCompressionModel() {
-    Object[] maxValues = { 10L, 20L, 30L };
-    Object[] minValues = { 1L, 2L, 3L };
-    int[] decimalLength = { 0, 0, 0 };
-    Object[] uniqueValues = { 5, new Long[]{2L,4L}, 2L};
-    DataType[] types = { DataType.LONG, DataType.LONG, DataType.LONG };
-    byte[] dataTypeSelected = { 1, 2, 4 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 3, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(0));
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(1));
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(2));
-  }
-
-  @Test public void testToGetValueCompressionModelForDefaultAggregatorType() {
-    Object[] maxValues = { 10.0 };
-    Object[] minValues = { 1.0 };
-    int[] decimalLength = { 0 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForShortAndByte() {
-    Object[] maxValues = { 32600.00 };
-    Object[] minValues = { 32500.00 };
-    int[] decimalLength = { 0 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForIntAndShort() {
-    Object[] maxValues = { 1111111111.0 };
-    Object[] minValues = { 1111078433.0 };
-    int[] decimalLength = { 0 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForByteAndInt() {
-    Object[] maxValues = { -32766.00 };
-    Object[] minValues = { 32744.0 };
-    int[] decimalLength = { 0 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE};
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForByteAndIntAndDecimal1() {
-    Object[] maxValues = { -32766.00 };
-    Object[] minValues = { 32744.0 };
-    int[] decimalLength = { 1 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.DELTA_DOUBLE,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForByteAndIntAndDataTypeSelected0() {
-    Object[] maxValues = { -32766.00 };
-    Object[] minValues = { 32744.0 };
-    int[] decimalLength = { 1 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 0 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.BIGINT,
-        writerCompressModel.getCompType(0));
-  }
-
-  @Test public void testToGetValueCompressionModelForFloatAndDataTypeSelected1() {
-    Object[] maxValues = { 32725566.00 };
-    Object[] minValues = { 32744.0 };
-    int[] decimalLength = { 1 };
-    Object[] uniqueValues = { 5 };
-    DataType[] types = { DataType.DOUBLE };
-    byte[] dataTypeSelected = { 1 };
-    MeasureMetaDataModel measureMetaDataModel =
-        new MeasureMetaDataModel(maxValues, minValues, decimalLength, 1, uniqueValues, types,
-            dataTypeSelected);
-    WriterCompressModel writerCompressModel =
-        ValueCompressionUtil.getWriterCompressModel(measureMetaDataModel);
-    assertEquals(ValueCompressionUtil.COMPRESSION_TYPE.ADAPTIVE,
-        writerCompressModel.getCompType(0));
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
index 480ed04..64651e5 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
@@ -25,10 +25,11 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.compression.WriterCompressModel;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
@@ -180,14 +181,25 @@ public class CarbonFooterWriterTest extends TestCase{
     infoColumnar.setAggKeyBlock(new boolean[] { true, true, true, true });
     infoColumnar.setColGrpBlocks(new boolean[] { false, false, false, false });
     infoColumnar.setMeasureNullValueIndex(new BitSet[] {new BitSet(),new BitSet()});
-    WriterCompressModel compressionModel = new WriterCompressModel();
-    compressionModel.setMaxValue(new Object[] { 44d, 55d });
-    compressionModel.setMinValue(new Object[] { 0d, 0d });
-    compressionModel.setMantissa(new int[] { 0, 0 });
-    compressionModel.setType(new DataType[] { DataType.DOUBLE, DataType.DOUBLE });
-    compressionModel.setUniqueValue(new Object[] { 0d, 0d });
-    compressionModel.setDataTypeSelected(new byte[2]);
-    infoColumnar.setCompressionModel(compressionModel);
+
+    ValueEncoderMeta[] metas = new ValueEncoderMeta[2];
+    metas[0] = new ValueEncoderMeta();
+    metas[0].setMinValue(0);
+    metas[0].setMaxValue(44d);
+    metas[0].setUniqueValue(0d);
+    metas[0].setDecimal(0);
+    metas[0].setType(CarbonCommonConstants.DOUBLE_MEASURE);
+    metas[0].setDataTypeSelected((byte)0);
+    metas[1] = new ValueEncoderMeta();
+    metas[1].setMinValue(0);
+    metas[1].setMaxValue(55d);
+    metas[1].setUniqueValue(0d);
+    metas[1].setDecimal(0);
+    metas[1].setType(CarbonCommonConstants.DOUBLE_MEASURE);
+    metas[1].setDataTypeSelected((byte)0);
+
+    MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas);
+    infoColumnar.setStats(stats);
     List<BlockletInfoColumnar> infoColumnars = new ArrayList<BlockletInfoColumnar>();
     infoColumnars.add(infoColumnar);
     return infoColumnars;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/docs/useful-tips-on-carbondata.md
----------------------------------------------------------------------
diff --git a/docs/useful-tips-on-carbondata.md b/docs/useful-tips-on-carbondata.md
index 40a3947..9bc5a46 100644
--- a/docs/useful-tips-on-carbondata.md
+++ b/docs/useful-tips-on-carbondata.md
@@ -209,5 +209,4 @@ scenarios. After the completion of POC, some of the configurations impacting the
 | carbon.detail.batch.size | spark/carbonlib/carbon.properties | Data loading | The buffer size to store records, returned from the block scan. | In limit scenario this parameter is very important. For example your query limit is 1000. But if we set this value to 3000 that means we get 3000 records from scan but spark will only take 1000 rows. So the 2000 remaining are useless. In one Finance test case after we set it to 100, in the limit 1000 scenario the performance increase about 2 times in comparison to if we set this value to 12000. |
 | carbon.use.local.dir | spark/carbonlib/carbon.properties | Data loading | Whether use YARN local directories for multi-table load disk load balance | If this is set it to true CarbonData will use YARN local directories for multi-table load disk load balance, that will improve the data load performance. |
 
-Note: If your CarbonData instance is provided only for query, you may specify the conf 'spark.speculation=true' which is conf
- in spark.
\ No newline at end of file
+Note: If your CarbonData instance is provided only for query, you may specify the conf 'spark.speculation=true' which is conf in spark.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 1c16ea4..5f7ff2e 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -179,6 +179,7 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
   }
 
   override def beforeAll {
+    cleanAllTables
     buildTestData
     buildTable
     buildRelation
@@ -276,9 +277,13 @@ class ExternalColumnDictionaryTestCase extends QueryTest with BeforeAndAfterAll
     }
   }
 
+  def cleanAllTables: Unit = {
+    sql("DROP TABLE IF EXISTS extComplextypes")
+    sql("DROP TABLE IF EXISTS verticalDelimitedTable")
+    sql("DROP TABLE IF EXISTS loadSqlTest")
+  }
+
   override def afterAll: Unit = {
-    sql("DROP TABLE extComplextypes")
-    sql("DROP TABLE verticalDelimitedTable")
-    sql("DROP TABLE loadSqlTest")
+    cleanAllTables
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index be29c1e..cc33ab8 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -220,6 +220,7 @@ class AddColumnTestCases extends QueryTest with BeforeAndAfterAll {
       s" OPTIONS" +
       s"('BAD_RECORDS_LOGGER_ENABLE'='TRUE', " +
       s"'BAD_RECORDS_ACTION'='FORCE','FILEHEADER'='CUST_ID,CUST_NAME,a6')")
+    sql("select a6 from carbon_measure_is_null where a6 is null").show
     checkAnswer(sql("select * from carbon_measure_is_null"),
       sql("select * from carbon_measure_is_null where a6 is null"))
     checkAnswer(sql("select count(*) from carbon_measure_is_null where a6 is not null"), Row(0))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
new file mode 100644
index 0000000..12fe27b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/core/datastore/GenericDataType.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.KeyGenerator;
+
+/**
+ * Generic DataType interface which will be used while data loading for complex types like Array &
+ * Struct
+ */
+public interface GenericDataType<T> {
+
+  /**
+   * @return name of the column
+   */
+  String getName();
+
+  /**
+   * @return - columns parent name
+   */
+  String getParentname();
+
+  /**
+   * @param children - To add children dimension for parent complex type
+   */
+  void addChildren(GenericDataType children);
+
+  /**
+   * @param primitiveChild - Returns all primitive type columns in complex type
+   */
+  void getAllPrimitiveChildren(List<GenericDataType> primitiveChild);
+
+  /**
+   * writes to byte stream
+   * @param dataOutputStream
+   * @throws IOException
+   */
+  void writeByteArray(T input, DataOutputStream dataOutputStream)
+      throws IOException, DictionaryGenerationException;
+
+  /**
+   * @return surrogateIndex for primitive column in complex type
+   */
+  int getSurrogateIndex();
+
+  /**
+   * @param surrIndex - surrogate index of primitive column in complex type
+   */
+  void setSurrogateIndex(int surrIndex);
+
+  /**
+   * converts integer surrogate to bit packed surrogate value
+   * @param byteArrayInput
+   * @param dataOutputStream
+   * @param generator
+   * @throws IOException
+   * @throws KeyGenException
+   */
+  void parseAndBitPack(ByteBuffer byteArrayInput, DataOutputStream dataOutputStream,
+      KeyGenerator[] generator) throws IOException, KeyGenException;
+
+  /**
+   * @return columns count of each complex type
+   */
+  int getColsCount();
+
+  /**
+   * @return column uuid string
+   */
+  String getColumnId();
+
+  /**
+   * set array index to be referred while creating metadata column
+   * @param outputArrayIndex
+   */
+  void setOutputArrayIndex(int outputArrayIndex);
+
+  /**
+   * @return array index count of metadata column
+   */
+  int getMaxOutputArrayIndex();
+
+  /**
+   * Split byte array into complex metadata column and primitive column
+   * @param columnsArray
+   * @param inputArray
+   */
+  void getColumnarDataForComplexType(List<ArrayList<byte[]>> columnsArray, ByteBuffer inputArray);
+
+  /**
+   * @return current read row count
+   */
+  int getDataCounter();
+
+  /**
+   * fill agg key block including complex types
+   * @param aggKeyBlockWithComplex
+   * @param aggKeyBlock
+   */
+  void fillAggKeyBlock(List<Boolean> aggKeyBlockWithComplex, boolean[] aggKeyBlock);
+
+  /**
+   * fill block key size including complex types
+   * @param blockKeySizeWithComplex
+   * @param primitiveBlockKeySize
+   */
+  void fillBlockKeySize(List<Integer> blockKeySizeWithComplex, int[] primitiveBlockKeySize);
+
+  /**
+   * fill cardinality value including complex types
+   * @param dimCardWithComplex
+   * @param maxSurrogateKeyArray
+   */
+  void fillCardinalityAfterDataLoad(List<Integer> dimCardWithComplex, int[] maxSurrogateKeyArray);
+
+  /**
+   * Fill the cardinality of the primitive datatypes
+   * @param dimCardWithComplex
+   */
+  void fillCardinality(List<Integer> dimCardWithComplex);
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
new file mode 100644
index 0000000..fb8fe3d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.columnar;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
+import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
+
+/**
+ * it is holder of column group data and also min max for colgroup block data
+ */
+public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
+
+  private byte[][] data;
+
+  private ColGroupMinMax colGrpMinMax;
+
+  public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex, byte[][] data) {
+    colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
+    this.data = data;
+    for (int i = 0; i < data.length; i++) {
+      colGrpMinMax.add(data[i]);
+    }
+  }
+
+  /**
+   * sorting is not required for colgroup storage and hence return true
+   */
+  @Override public boolean isAlreadySorted() {
+    return true;
+  }
+
+  /**
+   * for column group storage its not required
+   */
+  @Override public ColGroupDataHolder getDataAfterComp() {
+    //not required for column group storage
+    return null;
+  }
+
+  /**
+   * for column group storage its not required
+   */
+  @Override public ColGroupDataHolder getIndexMap() {
+    // not required for column group storage
+    return null;
+  }
+
+  /**
+   * for column group storage its not required
+   */
+  @Override public byte[][] getKeyBlock() {
+    return data;
+  }
+
+  /**
+   * for column group storage its not required
+   */
+  @Override public ColGroupDataHolder getDataIndexMap() {
+    //not required for column group
+    return null;
+  }
+
+  /**
+   * for column group storage its not required
+   */
+  @Override public int getTotalSize() {
+    return data.length;
+  }
+
+  @Override public byte[] getMin() {
+    return colGrpMinMax.getMin();
+  }
+
+  @Override public byte[] getMax() {
+    return colGrpMinMax.getMax();
+  }
+
+  /**
+   * return self
+   */
+  @Override public IndexStorage call() throws Exception {
+    return this;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc83b2ac/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
index 02ceb06..f5fdd4d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/ArrayDataType.java
@@ -23,6 +23,7 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;