You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/28 06:10:03 UTC

[3/7] carbondata git commit: [CARBONDATA-1098] Change page statistics use exact type and use column page in writer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
new file mode 100644
index 0000000..73ada4b
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/PrimitivePageStatsCollector.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.math.BigDecimal;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/** statics for primitive column page */
+public class PrimitivePageStatsCollector implements ColumnPageStatsCollector, SimpleStatsResult {
+  private DataType dataType;
+  private byte minByte, maxByte;
+  private short minShort, maxShort;
+  private int minInt, maxInt;
+  private long minLong, maxLong;
+  private double minDouble, maxDouble;
+
+  // scale of the double value
+  private int decimal;
+
+  // The index of the rowId whose value is null, will be set to 1
+  private BitSet nullBitSet;
+
+  // this is for encode flow
+  public static PrimitivePageStatsCollector newInstance(DataType dataType, int pageSize) {
+    switch (dataType) {
+      default:
+        return new PrimitivePageStatsCollector(dataType, pageSize);
+    }
+  }
+
+  // this is for decode flow, we do not need to create nullBits, so passing 0 as pageSize
+  public static PrimitivePageStatsCollector newInstance(ColumnPageCodecMeta meta) {
+    PrimitivePageStatsCollector instance =
+        new PrimitivePageStatsCollector(meta.getSrcDataType(), 0);
+    // set min max from meta
+    switch (meta.getSrcDataType()) {
+      case BYTE:
+        instance.minByte = (byte) meta.getMinValue();
+        instance.maxByte = (byte) meta.getMaxValue();
+        break;
+      case SHORT:
+        instance.minShort = (short) meta.getMinValue();
+        instance.maxShort = (short) meta.getMaxValue();
+        break;
+      case INT:
+        instance.minInt = (int) meta.getMinValue();
+        instance.maxInt = (int) meta.getMaxValue();
+        break;
+      case LONG:
+        instance.minLong = (long) meta.getMinValue();
+        instance.maxLong = (long) meta.getMaxValue();
+        break;
+      case DOUBLE:
+        instance.minDouble = (double) meta.getMinValue();
+        instance.maxDouble = (double) meta.getMaxValue();
+        instance.decimal = meta.getDecimal();
+        break;
+    }
+    return instance;
+  }
+
+  public static PrimitivePageStatsCollector newInstance(ValueEncoderMeta meta) {
+    PrimitivePageStatsCollector instance =
+        new PrimitivePageStatsCollector(meta.getType(), 0);
+    // set min max from meta
+    switch (meta.getType()) {
+      case BYTE:
+        instance.minByte = (byte) meta.getMinValue();
+        instance.maxByte = (byte) meta.getMaxValue();
+        break;
+      case SHORT:
+        instance.minShort = (short) meta.getMinValue();
+        instance.maxShort = (short) meta.getMaxValue();
+        break;
+      case INT:
+        instance.minInt = (int) meta.getMinValue();
+        instance.maxInt = (int) meta.getMaxValue();
+        break;
+      case LONG:
+        instance.minLong = (long) meta.getMinValue();
+        instance.maxLong = (long) meta.getMaxValue();
+        break;
+      case DOUBLE:
+        instance.minDouble = (double) meta.getMinValue();
+        instance.maxDouble = (double) meta.getMaxValue();
+        instance.decimal = meta.getDecimal();
+        break;
+    }
+    return instance;
+  }
+
+  private PrimitivePageStatsCollector(DataType dataType, int pageSize) {
+    this.dataType = dataType;
+    this.nullBitSet = new BitSet(pageSize);
+    switch (dataType) {
+      case BYTE:
+        minByte = Byte.MAX_VALUE;
+        maxByte = Byte.MIN_VALUE;
+        break;
+      case SHORT:
+        minShort = Short.MAX_VALUE;
+        maxShort = Short.MIN_VALUE;
+        break;
+      case INT:
+        minInt = Integer.MAX_VALUE;
+        maxInt = Integer.MIN_VALUE;
+        break;
+      case LONG:
+        minLong = Long.MAX_VALUE;
+        maxLong = Long.MIN_VALUE;
+        break;
+      case DOUBLE:
+        minDouble = Double.MAX_VALUE;
+        maxDouble = Double.MIN_VALUE;
+        decimal = 0;
+        break;
+      case DECIMAL:
+    }
+  }
+
+  @Override
+  public void updateNull(int rowId) {
+    nullBitSet.set(rowId);
+    long value = 0;
+    switch (dataType) {
+      case BYTE:
+        update((byte) value);
+        break;
+      case SHORT:
+        update((short) value);
+        break;
+      case INT:
+        update((int) value);
+        break;
+      case LONG:
+        update(value);
+        break;
+      case DOUBLE:
+        update(0d);
+        break;
+    }
+  }
+
+  @Override
+  public void update(byte value) {
+    if (minByte > value) {
+      minByte = value;
+    }
+    if (maxByte < value) {
+      maxByte = value;
+    }
+  }
+
+  @Override
+  public void update(short value) {
+    if (minShort > value) {
+      minShort = value;
+    }
+    if (maxShort < value) {
+      maxShort = value;
+    }
+  }
+
+  @Override
+  public void update(int value) {
+    if (minInt > value) {
+      minInt = value;
+    }
+    if (maxInt < value) {
+      maxInt = value;
+    }
+  }
+
+  @Override
+  public void update(long value) {
+    if (minLong > value) {
+      minLong = value;
+    }
+    if (maxLong < value) {
+      maxLong = value;
+    }
+  }
+
+  @Override
+  public void update(double value) {
+    if (minDouble > value) {
+      minDouble = value;
+    }
+    if (maxDouble < value) {
+      maxDouble = value;
+    }
+    int scale = BigDecimal.valueOf(value).scale();
+    if (scale < 0) {
+      decimal = scale;
+    } else {
+      decimal = Math.max(decimal, scale);
+    }
+  }
+
+  @Override
+  public void update(byte[] value) {
+  }
+
+  @Override
+  public Object getPageStats() {
+    return this;
+  }
+
+  @Override
+  public String toString() {
+    switch (dataType) {
+      case BYTE:
+        return String.format("min: %s, max: %s, decimal: %s ", minByte, maxByte, decimal);
+      case SHORT:
+        return String.format("min: %s, max: %s, decimal: %s ", minShort, maxShort, decimal);
+      case INT:
+        return String.format("min: %s, max: %s, decimal: %s ", minInt, maxInt, decimal);
+      case LONG:
+        return String.format("min: %s, max: %s, decimal: %s ", minLong, maxLong, decimal);
+      case DOUBLE:
+        return String.format("min: %s, max: %s, decimal: %s ", minDouble, maxDouble, decimal);
+    }
+    return super.toString();
+  }
+
+  @Override
+  public Object getMin() {
+    switch (dataType) {
+      case BYTE:
+        return minByte;
+      case SHORT:
+        return minShort;
+      case INT:
+        return minInt;
+      case LONG:
+        return minLong;
+      case DOUBLE:
+        return minDouble;
+    }
+    return null;
+  }
+
+  @Override
+  public Object getMax() {
+    switch (dataType) {
+      case BYTE:
+        return maxByte;
+      case SHORT:
+        return maxShort;
+      case INT:
+        return maxInt;
+      case LONG:
+        return maxLong;
+      case DOUBLE:
+        return maxDouble;
+    }
+    return null;
+  }
+
+  @Override
+  public BitSet getNullBits() {
+    return nullBitSet;
+  }
+
+  @Override
+  public int getDecimalPoint() {
+    return decimal;
+  }
+
+  @Override
+  public DataType getDataType() {
+    return dataType;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
new file mode 100644
index 0000000..1db86ff
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/SimpleStatsResult.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+public interface SimpleStatsResult {
+
+  Object getMin();
+
+  Object getMax();
+
+  BitSet getNullBits();
+
+  int getDecimalPoint();
+
+  DataType getDataType();
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
new file mode 100644
index 0000000..07de9c0
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/TablePageStatistics.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+// Statistics of dimension and measure column in a TablePage
+public class TablePageStatistics {
+
+  // number of dimension after complex column expanded
+  private int numDimensionsExpanded;
+
+  // min of each dimension column
+  private byte[][] dimensionMinValue;
+
+  // max of each dimension column
+  private byte[][] dimensionMaxValue;
+
+  // min of each measure column
+  private byte[][] measureMinValue;
+
+  // max os each measure column
+  private byte[][] measureMaxValue;
+
+  // null bit set for each measure column
+  private BitSet[] nullBitSet;
+
+  public TablePageStatistics(EncodedDimensionPage[] dimensions,
+      EncodedMeasurePage[] measures) {
+    this.numDimensionsExpanded = dimensions.length;
+    int numMeasures = measures.length;
+    this.dimensionMinValue = new byte[numDimensionsExpanded][];
+    this.dimensionMaxValue = new byte[numDimensionsExpanded][];
+    this.measureMinValue = new byte[numMeasures][];
+    this.measureMaxValue = new byte[numMeasures][];
+    this.nullBitSet = new BitSet[numMeasures];
+    updateDimensionMinMax(dimensions);
+    updateMeasureMinMax(measures);
+  }
+
+  private void updateDimensionMinMax(EncodedDimensionPage[] dimensions) {
+    for (int i = 0; i < dimensions.length; i++) {
+      IndexStorage keyStorageArray = dimensions[i].getIndexStorage();
+      switch (dimensions[i].getDimensionType()) {
+        case GLOBAL_DICTIONARY:
+        case DIRECT_DICTIONARY:
+        case COLUMN_GROUP:
+        case COMPLEX:
+          dimensionMinValue[i] = keyStorageArray.getMin();
+          dimensionMaxValue[i] = keyStorageArray.getMax();
+          break;
+        case PLAIN_VALUE:
+          dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray.getMin());
+          dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray.getMax());
+          break;
+      }
+    }
+  }
+
+  private void updateMeasureMinMax(EncodedMeasurePage[] measures) {
+    for (int i = 0; i < measures.length; i++) {
+      ValueEncoderMeta meta = measures[i].getMetaData();
+      if (meta instanceof ColumnPageCodecMeta) {
+        ColumnPageCodecMeta metadata = (ColumnPageCodecMeta) meta;
+        measureMaxValue[i] = metadata.getMaxAsBytes();
+        measureMinValue[i] = metadata.getMinAsBytes();
+      } else {
+        measureMaxValue[i] = CarbonUtil.getMaxValueAsBytes(meta);
+        measureMinValue[i] = CarbonUtil.getMinValueAsBytes(meta);
+      }
+      nullBitSet[i] = measures[i].getNullBitSet();
+    }
+  }
+
+  /**
+   * Below method will be used to update the min or max value
+   * by removing the length from it
+   *
+   * @return min max value without length
+   */
+  public static byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
+    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
+    byte[] actualValue = new byte[buffer.getShort()];
+    buffer.get(actualValue);
+    return actualValue;
+  }
+
+  public byte[][] getDimensionMinValue() {
+    return dimensionMinValue;
+  }
+
+  public byte[][] getDimensionMaxValue() {
+    return dimensionMaxValue;
+  }
+
+  public byte[][] getMeasureMinValue() {
+    return measureMinValue;
+  }
+
+  public byte[][] getMeasureMaxValue() {
+    return measureMaxValue;
+  }
+
+  public BitSet[] getNullBitSet() {
+    return nullBitSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
new file mode 100644
index 0000000..e985f90
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/VarLengthPageStatsCollector.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import java.util.BitSet;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+
+public class VarLengthPageStatsCollector implements ColumnPageStatsCollector {
+
+  private byte[] min, max;
+
+  public static VarLengthPageStatsCollector newInstance() {
+    return new VarLengthPageStatsCollector();
+  }
+
+  private VarLengthPageStatsCollector() {
+  }
+
+  @Override
+  public void updateNull(int rowId) {
+
+  }
+
+  @Override
+  public void update(byte value) {
+
+  }
+
+  @Override
+  public void update(short value) {
+
+  }
+
+  @Override
+  public void update(int value) {
+
+  }
+
+  @Override
+  public void update(long value) {
+
+  }
+
+  @Override
+  public void update(double value) {
+
+  }
+
+  @Override
+  public void update(byte[] value) {
+    if (min == null && max == null) {
+      min = value;
+      max = value;
+    } else {
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(min, value) > 0) {
+        min = value;
+      }
+      if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(max, value) < 0) {
+        max = value;
+      }
+    }
+  }
+
+  @Override
+  public Object getPageStats() {
+    // for binary type, we do not collect its stats
+    return new SimpleStatsResult() {
+
+      @Override public Object getMin() {
+        return min;
+      }
+
+      @Override public Object getMax() {
+        return max;
+      }
+
+      @Override public BitSet getNullBits() {
+        return null;
+      }
+
+      @Override public int getDecimalPoint() {
+        return 0;
+      }
+
+      @Override public DataType getDataType() {
+        return null;
+      }
+    };
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
index 47df6a5..da8a33d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/BlockletInfoColumnar.java
@@ -19,10 +19,13 @@ package org.apache.carbondata.core.metadata;
 
 import java.util.BitSet;
 
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 
+// It is used for V1 and V2 format only
 public class BlockletInfoColumnar {
 
+  private EncodedTablePage encodedTablePage;
+
   /**
    * measureOffset.
    */
@@ -85,8 +88,6 @@ public class BlockletInfoColumnar {
 
   private boolean[] aggKeyBlock;
 
-  private MeasurePageStatsVO stats;
-
   /**
    * column min array
    */
@@ -98,11 +99,6 @@ public class BlockletInfoColumnar {
   private byte[][] columnMinData;
 
   /**
-   * true if given index is colgroup block
-   */
-  private boolean[] colGrpBlock;
-
-  /**
    * bit set which will holds the measure
    * indexes which are null
    */
@@ -317,20 +313,6 @@ public class BlockletInfoColumnar {
   }
 
   /**
-   * @return
-   */
-  public boolean[] getColGrpBlocks() {
-    return this.colGrpBlock;
-  }
-
-  /**
-   * @param colGrpBlock
-   */
-  public void setColGrpBlocks(boolean[] colGrpBlock) {
-    this.colGrpBlock = colGrpBlock;
-  }
-
-  /**
    * @return the measureNullValueIndex
    */
   public BitSet[] getMeasureNullValueIndex() {
@@ -344,11 +326,11 @@ public class BlockletInfoColumnar {
     this.measureNullValueIndex = measureNullValueIndex;
   }
 
-  public MeasurePageStatsVO getStats() {
-    return stats;
+  public void setEncodedTablePage(EncodedTablePage encodedData) {
+    this.encodedTablePage = encodedData;
   }
 
-  public void setStats(MeasurePageStatsVO stats) {
-    this.stats = stats;
+  public EncodedTablePage getEncodedTablePage() {
+    return encodedTablePage;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
new file mode 100644
index 0000000..ac83333
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CodecMetaFactory.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import static org.apache.carbondata.core.metadata.datatype.DataType.*;
+import static org.apache.carbondata.core.metadata.datatype.DataType.LONG;
+
+public class CodecMetaFactory {
+
+  private static final ColumnarFormatVersion version =
+      CarbonProperties.getInstance().getFormatVersion();
+
+  public static ValueEncoderMeta createMeta() {
+    switch (version) {
+      case V1:
+      case V2:
+        return new ValueEncoderMeta();
+      case V3:
+        return ColumnPageCodecMeta.newInstance();
+      default:
+        throw new UnsupportedOperationException("unsupported version: " + version);
+    }
+  }
+
+  public static ValueEncoderMeta createMeta(SimpleStatsResult stats, DataType targetDataType) {
+    switch (version) {
+      case V1:
+      case V2:
+        ValueEncoderMeta meta = new ValueEncoderMeta();
+        switch (stats.getDataType()) {
+          case SHORT:
+            meta.setMaxValue((long)(short) stats.getMax());
+            meta.setMinValue((long)(short) stats.getMin());
+            break;
+          case INT:
+            meta.setMaxValue((long)(int) stats.getMax());
+            meta.setMinValue((long)(int) stats.getMin());
+            break;
+          default:
+            meta.setMaxValue(stats.getMax());
+            meta.setMinValue(stats.getMin());
+            break;
+        }
+        meta.setDecimal(stats.getDecimalPoint());
+        meta.setType(converType(stats.getDataType()));
+        return meta;
+      case V3:
+        return ColumnPageCodecMeta.newInstance(stats, targetDataType);
+      default:
+        throw new UnsupportedOperationException("unsupported version: " + version);
+    }
+  }
+
+  public static char converType(DataType type) {
+    switch (type) {
+      case BYTE:
+      case SHORT:
+      case INT:
+      case LONG:
+        return CarbonCommonConstants.BIG_INT_MEASURE;
+      case DOUBLE:
+        return CarbonCommonConstants.DOUBLE_MEASURE;
+      case DECIMAL:
+        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
+      default:
+        throw new RuntimeException("Unexpected type: " + type);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
new file mode 100644
index 0000000..20a7568
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ColumnPageCodecMeta.java
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata;
+
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.BitSet;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+
+/**
+ * It holds metadata for one column page
+ */
+public class ColumnPageCodecMeta extends ValueEncoderMeta implements Serializable {
+
+  private BitSet nullBitSet;
+
+  private DataType srcDataType;
+
+  private DataType targetDataType;
+
+  public static final char BYTE_VALUE_MEASURE = 'c';
+  public static final char SHORT_VALUE_MEASURE = 'j';
+  public static final char INT_VALUE_MEASURE = 'k';
+  public static final char BIG_INT_MEASURE = 'd';
+  public static final char DOUBLE_MEASURE = 'n';
+  public static final char BIG_DECIMAL_MEASURE = 'b';
+
+  static ColumnPageCodecMeta newInstance() {
+    return new ColumnPageCodecMeta();
+  }
+
+  static ColumnPageCodecMeta newInstance(
+      SimpleStatsResult stats, DataType targetDataType) {
+    ColumnPageCodecMeta meta = new ColumnPageCodecMeta();
+    meta.srcDataType = stats.getDataType();
+    meta.targetDataType = targetDataType;
+    meta.nullBitSet = stats.getNullBits();
+    meta.setType(CodecMetaFactory.converType(stats.getDataType()));
+    meta.setMaxValue(stats.getMax());
+    meta.setMinValue(stats.getMin());
+    meta.setDecimal(stats.getDecimalPoint());
+    return meta;
+  }
+
+  public DataType getTargetDataType() {
+    return targetDataType;
+  }
+
+  public void setSrcDataType(char type) {
+    switch (type) {
+      case BYTE_VALUE_MEASURE:
+        srcDataType = DataType.BYTE;
+        break;
+      case SHORT_VALUE_MEASURE:
+        srcDataType = DataType.SHORT;
+        break;
+      case INT_VALUE_MEASURE:
+        srcDataType = DataType.INT;
+        break;
+      case BIG_INT_MEASURE:
+        srcDataType = DataType.LONG;
+        break;
+      case DOUBLE_MEASURE:
+        srcDataType = DataType.DOUBLE;
+        break;
+      case BIG_DECIMAL_MEASURE:
+        srcDataType = DataType.DECIMAL;
+        break;
+      default:
+        throw new RuntimeException("Unexpected type: " + type);
+    }
+  }
+
+  private char getSrcDataTypeInChar() {
+    switch (srcDataType) {
+      case BYTE:
+        return BYTE_VALUE_MEASURE;
+      case SHORT:
+        return SHORT_VALUE_MEASURE;
+      case INT:
+        return INT_VALUE_MEASURE;
+      case LONG:
+        return BIG_INT_MEASURE;
+      case DOUBLE:
+        return DOUBLE_MEASURE;
+      case DECIMAL:
+        return BIG_DECIMAL_MEASURE;
+      default:
+        throw new RuntimeException("Unexpected type: " + targetDataType);
+    }
+  }
+
+  public BitSet getNullBitSet() {
+    return nullBitSet;
+  }
+
+  public void setNullBitSet(BitSet nullBitSet) {
+    this.nullBitSet = nullBitSet;
+  }
+
+  public DataType getSrcDataType() {
+    return srcDataType;
+  }
+
+  public byte[] serialize() {
+    ByteBuffer buffer = null;
+    switch (srcDataType) {
+      case BYTE:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        buffer.put((byte) getMaxValue());
+        buffer.put((byte) getMinValue());
+        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
+        break;
+      case SHORT:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        buffer.putShort((short) getMaxValue());
+        buffer.putShort((short) getMinValue());
+        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
+        break;
+      case INT:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        buffer.putInt((int) getMaxValue());
+        buffer.putInt((int) getMinValue());
+        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
+        break;
+      case LONG:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        buffer.putLong((Long) getMaxValue());
+        buffer.putLong((Long) getMinValue());
+        buffer.putLong((Long) 0L); // unique value is obsoleted, maintain for compatibility
+        break;
+      case DOUBLE:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        buffer.putDouble((Double) getMaxValue());
+        buffer.putDouble((Double) getMinValue());
+        buffer.putDouble((Double) 0d); // unique value is obsoleted, maintain for compatibility
+        break;
+      case DECIMAL:
+        buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+        buffer.putChar(getSrcDataTypeInChar());
+        break;
+    }
+    buffer.putInt(getDecimal());
+    buffer.put(getDataTypeSelected());
+    buffer.flip();
+    return buffer.array();
+  }
+
+  public void deserialize(byte[] encodeMeta) {
+    ByteBuffer buffer = ByteBuffer.wrap(encodeMeta);
+    char srcDataType = buffer.getChar();
+    this.setSrcDataType(srcDataType);
+    switch (srcDataType) {
+      case DOUBLE_MEASURE:
+        this.setMaxValue(buffer.getDouble());
+        this.setMinValue(buffer.getDouble());
+        buffer.getDouble(); // for non exist value which is obsoleted, it is backward compatibility;
+        break;
+      case BIG_DECIMAL_MEASURE:
+        this.setMaxValue(0.0);
+        this.setMinValue(0.0);
+        break;
+      case BYTE_VALUE_MEASURE:
+        this.setMaxValue(buffer.get());
+        this.setMinValue(buffer.get());
+        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
+        break;
+      case SHORT_VALUE_MEASURE:
+        this.setMaxValue(buffer.getShort());
+        this.setMinValue(buffer.getShort());
+        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
+        break;
+      case INT_VALUE_MEASURE:
+        this.setMaxValue(buffer.getInt());
+        this.setMinValue(buffer.getInt());
+        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
+        break;
+      case BIG_INT_MEASURE:
+        this.setMaxValue(buffer.getLong());
+        this.setMinValue(buffer.getLong());
+        buffer.getLong();  // for non exist value which is obsoleted, it is backward compatibility;
+        break;
+      default:
+        throw new IllegalArgumentException("invalid measure type");
+    }
+    this.setDecimal(buffer.getInt());
+    buffer.get(); // for selectedDataType, obsoleted
+  }
+
+  public byte[] getMaxAsBytes() {
+    return getValueAsBytes(getMaxValue());
+  }
+
+  public byte[] getMinAsBytes() {
+    return getValueAsBytes(getMinValue());
+  }
+
+  /**
+   * convert value to byte array
+   */
+  private byte[] getValueAsBytes(Object value) {
+    ByteBuffer b;
+    switch (srcDataType) {
+      case BYTE:
+        b = ByteBuffer.allocate(8);
+        b.putLong((byte) value);
+        b.flip();
+        return b.array();
+      case SHORT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((short) value);
+        b.flip();
+        return b.array();
+      case INT:
+        b = ByteBuffer.allocate(8);
+        b.putLong((int) value);
+        b.flip();
+        return b.array();
+      case LONG:
+        b = ByteBuffer.allocate(8);
+        b.putLong((long) value);
+        b.flip();
+        return b.array();
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((double) value);
+        b.flip();
+        return b.array();
+      case DECIMAL:
+      case BYTE_ARRAY:
+        return new byte[8];
+      default:
+        throw new IllegalArgumentException("Invalid data type: " + targetDataType);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
index 741b999..971359d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/ValueEncoderMeta.java
@@ -109,4 +109,4 @@ public class ValueEncoderMeta implements Serializable {
   public void setDataTypeSelected(byte dataTypeSelected) {
     this.dataTypeSelected = dataTypeSelected;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
index ad17240..4dadcc2 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/AbstractScannedResultCollector.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.collector.ScannedResultCollector;
 import org.apache.carbondata.core.scan.executor.infos.BlockExecutionInfo;
 import org.apache.carbondata.core.scan.executor.infos.DimensionInfo;
-import org.apache.carbondata.core.scan.executor.infos.KeyStructureInfo;
 import org.apache.carbondata.core.scan.executor.infos.MeasureInfo;
 import org.apache.carbondata.core.scan.model.QueryMeasure;
 import org.apache.carbondata.core.scan.result.AbstractScannedResult;
@@ -42,11 +41,6 @@ public abstract class AbstractScannedResultCollector implements ScannedResultCol
       LogServiceFactory.getLogService(AbstractScannedResultCollector.class.getName());
 
   /**
-   * restructuring info
-   */
-  private KeyStructureInfo restructureInfos;
-
-  /**
    * table block execution infos
    */
   protected BlockExecutionInfo tableBlockExecutionInfos;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
index 555580a..60546ed 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonMetadataUtil.java
@@ -16,10 +16,8 @@
  */
 package org.apache.carbondata.core.util;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -29,10 +27,10 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.statistics.TablePageStatistics;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
@@ -76,11 +74,10 @@ public class CarbonMetadataUtil {
    * It converts list of BlockletInfoColumnar to FileFooter thrift objects
    *
    * @param infoList
-   * @param numCols
    * @param cardinalities
    * @return FileFooter
    */
-  public static FileFooter convertFileFooter(List<BlockletInfoColumnar> infoList, int numCols,
+  public static FileFooter convertFileFooter(List<BlockletInfoColumnar> infoList,
       int[] cardinalities, List<ColumnSchema> columnSchemaList, SegmentProperties segmentProperties)
       throws IOException {
     FileFooter footer = getFileFooter(infoList, cardinalities, columnSchemaList);
@@ -244,15 +241,19 @@ public class CarbonMetadataUtil {
     return blockletIndex;
   }
 
-  public static BlockletIndex getBlockletIndex(List<NodeHolder> nodeHolderList,
+  public static BlockletIndex getBlockletIndex(List<EncodedTablePage> encodedTablePageList,
       List<CarbonMeasure> carbonMeasureList) {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
     // Calculating min/max for every each column.
-    byte[][] minCol = nodeHolderList.get(0).getDimensionColumnMinData().clone();
-    byte[][] maxCol = nodeHolderList.get(0).getDimensionColumnMaxData().clone();
-    for (NodeHolder nodeHolder : nodeHolderList) {
-      byte[][] columnMaxData = nodeHolder.getDimensionColumnMaxData();
-      byte[][] columnMinData = nodeHolder.getDimensionColumnMinData();
+    TablePageStatistics stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(),
+        encodedTablePageList.get(0).getMeasures());
+    byte[][] minCol = stats.getDimensionMinValue().clone();
+    byte[][] maxCol = stats.getDimensionMaxValue().clone();
+    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
+      stats = new TablePageStatistics(encodedTablePage.getDimensions(),
+          encodedTablePage.getMeasures());
+      byte[][] columnMaxData = stats.getDimensionMaxValue();
+      byte[][] columnMinData = stats.getDimensionMinValue();
       for (int i = 0; i < maxCol.length; i++) {
         if (ByteUtil.UnsafeComparer.INSTANCE.compareTo(columnMaxData[i], maxCol[i]) > 0) {
           maxCol[i] = columnMaxData[i];
@@ -270,14 +271,18 @@ public class CarbonMetadataUtil {
       blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
     }
 
-    byte[][] measureMaxValue = nodeHolderList.get(0).getMeasureColumnMaxData().clone();
-    byte[][] measureMinValue = nodeHolderList.get(0).getMeasureColumnMinData().clone();
+    stats = new TablePageStatistics(encodedTablePageList.get(0).getDimensions(),
+        encodedTablePageList.get(0).getMeasures());
+    byte[][] measureMaxValue = stats.getMeasureMaxValue().clone();
+    byte[][] measureMinValue = stats.getMeasureMinValue().clone();
     byte[] minVal = null;
     byte[] maxVal = null;
-    for (int i = 1; i < nodeHolderList.size(); i++) {
+    for (int i = 1; i < encodedTablePageList.size(); i++) {
       for (int j = 0; j < measureMinValue.length; j++) {
-        minVal = nodeHolderList.get(i).getMeasureColumnMinData()[j];
-        maxVal = nodeHolderList.get(i).getMeasureColumnMaxData()[j];
+        stats = new TablePageStatistics(
+            encodedTablePageList.get(i).getDimensions(), encodedTablePageList.get(i).getMeasures());
+        minVal = stats.getMeasureMinValue()[j];
+        maxVal = stats.getMeasureMaxValue()[j];
         if (compareMeasureData(measureMaxValue[j], maxVal, carbonMeasureList.get(j).getDataType())
             < 0) {
           measureMaxValue[j] = maxVal.clone();
@@ -296,8 +301,11 @@ public class CarbonMetadataUtil {
       blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(min));
     }
     BlockletBTreeIndex blockletBTreeIndex = new BlockletBTreeIndex();
-    blockletBTreeIndex.setStart_key(nodeHolderList.get(0).getStartKey());
-    blockletBTreeIndex.setEnd_key(nodeHolderList.get(nodeHolderList.size() - 1).getEndKey());
+    byte[] startKey = encodedTablePageList.get(0).getPageKey().serializeStartKey();
+    blockletBTreeIndex.setStart_key(startKey);
+    byte[] endKey = encodedTablePageList.get(
+        encodedTablePageList.size() - 1).getPageKey().serializeEndKey();
+    blockletBTreeIndex.setEnd_key(endKey);
     BlockletIndex blockletIndex = new BlockletIndex();
     blockletIndex.setMin_max_index(blockletMinMaxIndex);
     blockletIndex.setB_tree_index(blockletBTreeIndex);
@@ -333,10 +341,9 @@ public class CarbonMetadataUtil {
     int aggregateIndex = 0;
     boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
     boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
-    boolean[] colGrpblock = blockletInfoColumnar.getColGrpBlocks();
     for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
       DataChunk dataChunk = new DataChunk();
-      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setChunk_meta(getSnappyChunkCompressionMeta());
       List<Encoding> encodings = new ArrayList<Encoding>();
       if (containsEncoding(i, Encoding.DICTIONARY, columnSchema, segmentProperties)) {
         encodings.add(Encoding.DICTIONARY);
@@ -344,7 +351,6 @@ public class CarbonMetadataUtil {
       if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchema, segmentProperties)) {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
-      dataChunk.setRowMajor(colGrpblock[i]);
       // TODO : Once schema PR is merged and information needs to be passed
       // here.
       dataChunk.setColumn_ids(new ArrayList<Integer>());
@@ -377,7 +383,7 @@ public class CarbonMetadataUtil {
 
     for (int i = 0; i < blockletInfoColumnar.getMeasureLength().length; i++) {
       DataChunk dataChunk = new DataChunk();
-      dataChunk.setChunk_meta(getChunkCompressionMeta());
+      dataChunk.setChunk_meta(getSnappyChunkCompressionMeta());
       dataChunk.setRowMajor(false);
       // TODO : Once schema PR is merged and information needs to be passed
       // here.
@@ -400,8 +406,10 @@ public class CarbonMetadataUtil {
       // dataChunk.setPresence(new PresenceMeta());
       // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-      encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
-          createValueEncoderMeta(blockletInfoColumnar.getStats(), i))));
+      encoderMetaList.add(
+          ByteBuffer.wrap(
+              serializeEncoderMeta(
+                      blockletInfoColumnar.getEncodedTablePage().getMeasure(i).getMetaData())));
       dataChunk.setEncoder_meta(encoderMetaList);
       colDataChunks.add(dataChunk);
     }
@@ -464,35 +472,10 @@ public class CarbonMetadataUtil {
     return aos.toByteArray();
   }
 
-  private static ValueEncoderMeta createValueEncoderMeta(MeasurePageStatsVO stats,
-      int index) {
-    ValueEncoderMeta encoderMeta = new ValueEncoderMeta();
-    encoderMeta.setMaxValue(stats.getMax(index));
-    encoderMeta.setMinValue(stats.getMin(index));
-    encoderMeta.setDataTypeSelected(stats.getDataTypeSelected(index));
-    encoderMeta.setType(getTypeInChar(stats.getDataType(index)));
-    return encoderMeta;
-  }
-
-  private static char getTypeInChar(DataType type) {
-    switch (type) {
-      case SHORT:
-      case INT:
-      case LONG:
-        return CarbonCommonConstants.BIG_INT_MEASURE;
-      case DOUBLE:
-        return CarbonCommonConstants.DOUBLE_MEASURE;
-      case DECIMAL:
-        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
-      default:
-        throw new RuntimeException("unsupported type: " + type);
-    }
-  }
-
   /**
    * Right now it is set to default values. We may use this in future
    */
-  private static ChunkCompressionMeta getChunkCompressionMeta() {
+  public static ChunkCompressionMeta getSnappyChunkCompressionMeta() {
     ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
     chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
     chunkCompressionMeta.setTotal_compressed_size(0);
@@ -500,111 +483,16 @@ public class CarbonMetadataUtil {
     return chunkCompressionMeta;
   }
 
+
   /**
-   * It converts FileFooter thrift object to list of BlockletInfoColumnar
-   * objects
-   *
-   * @param footer
-   * @return
+   * Right now it is set to default values. We may use this in future
    */
-  public static List<BlockletInfoColumnar> convertBlockletInfo(FileFooter footer)
-      throws IOException {
-    List<BlockletInfoColumnar> listOfNodeInfo =
-        new ArrayList<BlockletInfoColumnar>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
-    for (BlockletInfo blockletInfo : footer.getBlocklet_info_list()) {
-      BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();
-      blockletInfoColumnar.setNumberOfKeys(blockletInfo.getNum_rows());
-      List<DataChunk> columnChunks = blockletInfo.getColumn_data_chunks();
-      List<DataChunk> dictChunks = new ArrayList<DataChunk>();
-      List<DataChunk> nonDictColChunks = new ArrayList<DataChunk>();
-      for (DataChunk dataChunk : columnChunks) {
-        if (dataChunk.getEncoders().get(0).equals(Encoding.DICTIONARY)) {
-          dictChunks.add(dataChunk);
-        } else {
-          nonDictColChunks.add(dataChunk);
-        }
-      }
-      int[] keyLengths = new int[dictChunks.size()];
-      long[] keyOffSets = new long[dictChunks.size()];
-      long[] keyBlockIndexOffsets = new long[dictChunks.size()];
-      int[] keyBlockIndexLens = new int[dictChunks.size()];
-      long[] indexMapOffsets = new long[dictChunks.size()];
-      int[] indexMapLens = new int[dictChunks.size()];
-      boolean[] sortState = new boolean[dictChunks.size()];
-      int i = 0;
-      for (DataChunk dataChunk : dictChunks) {
-        keyLengths[i] = dataChunk.getData_page_length();
-        keyOffSets[i] = dataChunk.getData_page_offset();
-        keyBlockIndexOffsets[i] = dataChunk.getRowid_page_offset();
-        keyBlockIndexLens[i] = dataChunk.getRowid_page_length();
-        indexMapOffsets[i] = dataChunk.getRle_page_offset();
-        indexMapLens[i] = dataChunk.getRle_page_length();
-        sortState[i] = dataChunk.getSort_state().equals(SortState.SORT_EXPLICIT);
-        i++;
-      }
-      blockletInfoColumnar.setKeyLengths(keyLengths);
-      blockletInfoColumnar.setKeyOffSets(keyOffSets);
-      blockletInfoColumnar.setKeyBlockIndexOffSets(keyBlockIndexOffsets);
-      blockletInfoColumnar.setKeyBlockIndexLength(keyBlockIndexLens);
-      blockletInfoColumnar.setDataIndexMapOffsets(indexMapOffsets);
-      blockletInfoColumnar.setDataIndexMapLength(indexMapLens);
-      blockletInfoColumnar.setIsSortedKeyColumn(sortState);
-
-      int[] msrLens = new int[nonDictColChunks.size()];
-      long[] msrOffsets = new long[nonDictColChunks.size()];
-      ValueEncoderMeta[] encoderMetas = new ValueEncoderMeta[nonDictColChunks.size()];
-      i = 0;
-      for (DataChunk msrChunk : nonDictColChunks) {
-        msrLens[i] = msrChunk.getData_page_length();
-        msrOffsets[i] = msrChunk.getData_page_offset();
-        encoderMetas[i] = deserializeValueEncoderMeta(msrChunk.getEncoder_meta().get(0));
-        i++;
-      }
-      blockletInfoColumnar.setMeasureLength(msrLens);
-      blockletInfoColumnar.setMeasureOffset(msrOffsets);
-      blockletInfoColumnar.setStats(getMeasurePageStats(encoderMetas));
-      listOfNodeInfo.add(blockletInfoColumnar);
-    }
-
-    setBlockletIndex(footer, listOfNodeInfo);
-    return listOfNodeInfo;
-  }
-
-  private static ValueEncoderMeta deserializeValueEncoderMeta(ByteBuffer byteBuffer)
-      throws IOException {
-    ByteArrayInputStream bis = new ByteArrayInputStream(byteBuffer.array());
-    ObjectInputStream objStream = new ObjectInputStream(bis);
-    ValueEncoderMeta encoderMeta = null;
-    try {
-      encoderMeta = (ValueEncoderMeta) objStream.readObject();
-    } catch (ClassNotFoundException e) {
-      LOGGER.error("Error while reading ValueEncoderMeta");
-    }
-    return encoderMeta;
-
-  }
-
-  private static MeasurePageStatsVO getMeasurePageStats(ValueEncoderMeta[] encoderMetas) {
-    return MeasurePageStatsVO.build(encoderMetas);
-  }
-
-  private static void setBlockletIndex(FileFooter footer,
-      List<BlockletInfoColumnar> listOfNodeInfo) {
-    List<BlockletIndex> blockletIndexList = footer.getBlocklet_index_list();
-    for (int i = 0; i < blockletIndexList.size(); i++) {
-      BlockletBTreeIndex bTreeIndexList = blockletIndexList.get(i).getB_tree_index();
-      BlockletMinMaxIndex minMaxIndexList = blockletIndexList.get(i).getMin_max_index();
-
-      listOfNodeInfo.get(i).setStartKey(bTreeIndexList.getStart_key());
-      listOfNodeInfo.get(i).setEndKey(bTreeIndexList.getEnd_key());
-      byte[][] min = new byte[minMaxIndexList.getMin_values().size()][];
-      byte[][] max = new byte[minMaxIndexList.getMax_values().size()][];
-      for (int j = 0; j < minMaxIndexList.getMax_valuesSize(); j++) {
-        min[j] = minMaxIndexList.getMin_values().get(j).array();
-        max[j] = minMaxIndexList.getMax_values().get(j).array();
-      }
-      listOfNodeInfo.get(i).setColumnMaxData(max);
-    }
+  private static ChunkCompressionMeta getChunkCompressionMeta() {
+    ChunkCompressionMeta chunkCompressionMeta = new ChunkCompressionMeta();
+    chunkCompressionMeta.setCompression_codec(CompressionCodec.SNAPPY);
+    chunkCompressionMeta.setTotal_compressed_size(0);
+    chunkCompressionMeta.setTotal_uncompressed_size(0);
+    return chunkCompressionMeta;
   }
 
   /**
@@ -673,7 +561,6 @@ public class CarbonMetadataUtil {
     int aggregateIndex = 0;
     boolean[] isSortedKeyColumn = blockletInfoColumnar.getIsSortedKeyColumn();
     boolean[] aggKeyBlock = blockletInfoColumnar.getAggKeyBlock();
-    boolean[] colGrpblock = blockletInfoColumnar.getColGrpBlocks();
     for (int i = 0; i < blockletInfoColumnar.getKeyLengths().length; i++) {
       DataChunk2 dataChunk = new DataChunk2();
       dataChunk.setChunk_meta(getChunkCompressionMeta());
@@ -684,7 +571,6 @@ public class CarbonMetadataUtil {
       if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchema, segmentProperties)) {
         encodings.add(Encoding.DIRECT_DICTIONARY);
       }
-      dataChunk.setRowMajor(colGrpblock[i]);
       // TODO : Once schema PR is merged and information needs to be passed
       // here.
       dataChunk.setData_page_length(blockletInfoColumnar.getKeyLengths()[i]);
@@ -732,8 +618,10 @@ public class CarbonMetadataUtil {
       // dataChunk.setPresence(new PresenceMeta());
       // TODO : Need to write ValueCompression meta here.
       List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-      encoderMetaList.add(ByteBuffer.wrap(serializeEncoderMeta(
-          createValueEncoderMeta(blockletInfoColumnar.getStats(), i))));
+      encoderMetaList.add(
+          ByteBuffer.wrap(
+              serializeEncoderMeta(
+                      blockletInfoColumnar.getEncodedTablePage().getMeasure(i).getMetaData())));
       dataChunk.setEncoder_meta(encoderMetaList);
       colDataChunks.add(dataChunk);
     }
@@ -741,106 +629,18 @@ public class CarbonMetadataUtil {
   }
 
   /**
-   * Below method will be used to get the data chunk object for all the columns
-   *
-   * @param nodeHolderList       blocklet info
-   * @param columnSchema        list of columns
-   * @param segmentProperties    segment properties
-   * @return list of data chunks
-   * @throws IOException
+   * return DataChunk3 that contains the input DataChunk2 list
    */
-  private static List<DataChunk2> getDatachunk2(List<NodeHolder> nodeHolderList,
-      List<ColumnSchema> columnSchema, SegmentProperties segmentProperties, int index,
-      boolean isDimensionColumn) throws IOException {
-    List<DataChunk2> colDataChunks = new ArrayList<DataChunk2>();
-    DataChunk2 dataChunk = null;
-    NodeHolder nodeHolder = null;
-    for (int i = 0; i < nodeHolderList.size(); i++) {
-      nodeHolder = nodeHolderList.get(i);
-      dataChunk = new DataChunk2();
-      dataChunk.min_max = new BlockletMinMaxIndex();
-      dataChunk.setChunk_meta(getChunkCompressionMeta());
-      dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
-      List<Encoding> encodings = new ArrayList<Encoding>();
-      if (isDimensionColumn) {
-        dataChunk.setData_page_length(nodeHolder.getKeyLengths()[index]);
-        if (containsEncoding(index, Encoding.DICTIONARY, columnSchema, segmentProperties)) {
-          encodings.add(Encoding.DICTIONARY);
-        }
-        if (containsEncoding(index, Encoding.DIRECT_DICTIONARY, columnSchema, segmentProperties)) {
-          encodings.add(Encoding.DIRECT_DICTIONARY);
-        }
-        dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[index]);
-        // TODO : Once schema PR is merged and information needs to be passed
-        // here.
-        if (nodeHolder.getRleEncodingForDictDim()[index]) {
-          dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[index]);
-          encodings.add(Encoding.RLE);
-        }
-        dataChunk.setSort_state(nodeHolder.getIsSortedKeyBlock()[index] ?
-            SortState.SORT_EXPLICIT :
-            SortState.SORT_NATIVE);
-
-        if (!nodeHolder.getIsSortedKeyBlock()[index]) {
-          dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[index]);
-          encodings.add(Encoding.INVERTED_INDEX);
-        }
-        dataChunk.min_max.addToMax_values(
-            ByteBuffer.wrap(nodeHolder.getDimensionColumnMaxData()[index]));
-        dataChunk.min_max.addToMin_values(
-            ByteBuffer.wrap(nodeHolder.getDimensionColumnMinData()[index]));
-      } else {
-        dataChunk.setData_page_length(nodeHolder.getDataArray()[index].length);
-        // TODO : Right now the encodings are happening at runtime. change as
-        // per this encoders.
-        dataChunk.setEncoders(encodings);
-
-        dataChunk.setRowMajor(false);
-        // TODO : Right now the encodings are happening at runtime. change as
-        // per this encoders.
-        encodings.add(Encoding.DELTA);
-        dataChunk.setEncoders(encodings);
-        // TODO writing dummy presence meta need to set actual presence
-        // meta
-        PresenceMeta presenceMeta = new PresenceMeta();
-        presenceMeta.setPresent_bit_streamIsSet(true);
-        presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
-            .compressByte(nodeHolder.getMeasureNullValueIndex()[index].toByteArray()));
-        dataChunk.setPresence(presenceMeta);
-        List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-        encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
-            createValueEncoderMeta(nodeHolder.getStats(), index))));
-        ByteBuffer decimalMeta = writeInfoIfDecimal(index, segmentProperties);
-        if (decimalMeta != null) {
-          encoderMetaList.add(decimalMeta);
-        }
-        dataChunk.setEncoder_meta(encoderMetaList);
-        dataChunk.min_max
-            .addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[index]));
-        dataChunk.min_max
-            .addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[index]));
-      }
-      dataChunk.setEncoders(encodings);
-      colDataChunks.add(dataChunk);
-    }
-    return colDataChunks;
-  }
-
-  public static DataChunk3 getDataChunk3(List<NodeHolder> nodeHolderList,
-      List<ColumnSchema> columnSchema, SegmentProperties segmentProperties, int index,
-      boolean isDimensionColumn) throws IOException {
-    List<DataChunk2> dataChunksList =
-        getDatachunk2(nodeHolderList, columnSchema, segmentProperties, index, isDimensionColumn);
+  public static DataChunk3 getDataChunk3(List<DataChunk2> dataChunksList) {
     int offset = 0;
     DataChunk3 dataChunk = new DataChunk3();
     List<Integer> pageOffsets = new ArrayList<>();
     List<Integer> pageLengths = new ArrayList<>();
     int length = 0;
-    for (int i = 0; i < dataChunksList.size(); i++) {
+    for (DataChunk2 dataChunk2 : dataChunksList) {
       pageOffsets.add(offset);
-      length =
-          dataChunksList.get(i).getData_page_length() + dataChunksList.get(i).getRle_page_length()
-              + dataChunksList.get(i).getRowid_page_length();
+      length = dataChunk2.getData_page_length() + dataChunk2.getRle_page_length() +
+          dataChunk2.getRowid_page_length();
       pageLengths.add(length);
       offset += length;
     }
@@ -850,36 +650,30 @@ public class CarbonMetadataUtil {
     return dataChunk;
   }
 
-  public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
-    ByteBuffer buffer = null;
-    switch (valueEncoderMeta.getType()) {
-      case LONG:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(valueEncoderMeta.getTypeInChar());
-        buffer.putLong((Long) valueEncoderMeta.getMaxValue());
-        buffer.putLong((Long) valueEncoderMeta.getMinValue());
-        buffer.putLong(0L);  // unique value, not used
-        break;
-      case DOUBLE:
-        buffer = ByteBuffer.allocate(
-            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
-                + 3);
-        buffer.putChar(valueEncoderMeta.getTypeInChar());
-        buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
-        buffer.putDouble((Double) valueEncoderMeta.getMinValue());
-        buffer.putDouble(0d); // unique value, not used
-        break;
-      case DECIMAL:
-        buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
-        buffer.putChar(valueEncoderMeta.getTypeInChar());
-        break;
+  /**
+   * return DataChunk3 for the dimension column (specifed by `columnIndex`)
+   * in `encodedTablePageList`
+   */
+  public static DataChunk3 getDimensionDataChunk3(List<EncodedTablePage> encodedTablePageList,
+      int columnIndex) throws IOException {
+    List<DataChunk2> dataChunksList = new ArrayList<>(encodedTablePageList.size());
+    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
+      dataChunksList.add(encodedTablePage.getDimension(columnIndex).getDataChunk2());
     }
-    buffer.putInt(0); // decimal point, not used
-    buffer.put(valueEncoderMeta.getDataTypeSelected());
-    buffer.flip();
-    return buffer.array();
+    return CarbonMetadataUtil.getDataChunk3(dataChunksList);
+  }
+
+  /**
+   * return DataChunk3 for the measure column (specifed by `columnIndex`)
+   * in `encodedTablePageList`
+   */
+  public static DataChunk3 getMeasureDataChunk3(List<EncodedTablePage> encodedTablePageList,
+      int columnIndex) throws IOException {
+    List<DataChunk2> dataChunksList = new ArrayList<>(encodedTablePageList.size());
+    for (EncodedTablePage encodedTablePage : encodedTablePageList) {
+      dataChunksList.add(encodedTablePage.getMeasure(columnIndex).getDataChunk2());
+    }
+    return CarbonMetadataUtil.getDataChunk3(dataChunksList);
   }
 
   public static int compareMeasureData(byte[] first, byte[] second, DataType dataType) {
@@ -931,83 +725,4 @@ public class CarbonMetadataUtil {
     return fileHeader;
   }
 
-  /**
-   * Below method will be used to get the data chunk2 serialize object list
-   *
-   * @param nodeHolder        node holder
-   * @param columnSchema     table columns
-   * @param segmentProperties segment properties
-   * @param isDimensionColumn to get the list of dimension column or measure column
-   * @return list of data chunk2
-   * @throws IOException
-   */
-  public static List<byte[]> getDataChunk2(NodeHolder nodeHolder, List<ColumnSchema> columnSchema,
-      SegmentProperties segmentProperties, boolean isDimensionColumn) throws IOException {
-    List<byte[]> dataChunkBuffer = new ArrayList<>();
-    if (isDimensionColumn) {
-      for (int i = 0; i < nodeHolder.getKeyArray().length; i++) {
-        DataChunk2 dataChunk = new DataChunk2();
-        dataChunk.min_max = new BlockletMinMaxIndex();
-        dataChunk.setChunk_meta(getChunkCompressionMeta());
-        dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
-        List<Encoding> encodings = new ArrayList<Encoding>();
-        dataChunk.setData_page_length(nodeHolder.getKeyLengths()[i]);
-        if (containsEncoding(i, Encoding.DICTIONARY, columnSchema, segmentProperties)) {
-          encodings.add(Encoding.DICTIONARY);
-        }
-        if (containsEncoding(i, Encoding.DIRECT_DICTIONARY, columnSchema, segmentProperties)) {
-          encodings.add(Encoding.DIRECT_DICTIONARY);
-        }
-        dataChunk.setRowMajor(nodeHolder.getColGrpBlocks()[i]);
-        if (nodeHolder.getRleEncodingForDictDim()[i]) {
-          dataChunk.setRle_page_length(nodeHolder.getDataIndexMapLength()[i]);
-          encodings.add(Encoding.RLE);
-        }
-        dataChunk.setSort_state(
-            nodeHolder.getIsSortedKeyBlock()[i] ? SortState.SORT_EXPLICIT : SortState.SORT_NATIVE);
-        if (!nodeHolder.getIsSortedKeyBlock()[i]) {
-          dataChunk.setRowid_page_length(nodeHolder.getKeyBlockIndexLength()[i]);
-          encodings.add(Encoding.INVERTED_INDEX);
-        }
-        dataChunk.min_max.addToMax_values(
-            ByteBuffer.wrap(nodeHolder.getDimensionColumnMaxData()[i]));
-        dataChunk.min_max.addToMin_values(
-            ByteBuffer.wrap(nodeHolder.getDimensionColumnMinData()[i]));
-        dataChunk.setEncoders(encodings);
-        dataChunkBuffer.add(CarbonUtil.getByteArray(dataChunk));
-      }
-    } else {
-      for (int i = 0; i < nodeHolder.getDataArray().length; i++) {
-        DataChunk2 dataChunk = new DataChunk2();
-        dataChunk.min_max = new BlockletMinMaxIndex();
-        dataChunk.setChunk_meta(getChunkCompressionMeta());
-        dataChunk.setNumberOfRowsInpage(nodeHolder.getEntryCount());
-        dataChunk.setData_page_length(nodeHolder.getDataArray()[i].length);
-        List<Encoding> encodings = new ArrayList<Encoding>();
-        // TODO : Right now the encodings are happening at runtime. change as
-        // per this encoders.
-        dataChunk.setEncoders(encodings);
-        dataChunk.setRowMajor(false);
-        // TODO : Right now the encodings are happening at runtime. change as
-        // per this encoders.
-        encodings.add(Encoding.DELTA);
-        dataChunk.setEncoders(encodings);
-        // TODO writing dummy presence meta need to set actual presence
-        // meta
-        PresenceMeta presenceMeta = new PresenceMeta();
-        presenceMeta.setPresent_bit_streamIsSet(true);
-        presenceMeta.setPresent_bit_stream(CompressorFactory.getInstance().getCompressor()
-            .compressByte(nodeHolder.getMeasureNullValueIndex()[i].toByteArray()));
-        dataChunk.setPresence(presenceMeta);
-        List<ByteBuffer> encoderMetaList = new ArrayList<ByteBuffer>();
-        encoderMetaList.add(ByteBuffer.wrap(serializeEncodeMetaUsingByteBuffer(
-            createValueEncoderMeta(nodeHolder.getStats(), i))));
-        dataChunk.setEncoder_meta(encoderMetaList);
-        dataChunk.min_max.addToMax_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMaxData()[i]));
-        dataChunk.min_max.addToMin_values(ByteBuffer.wrap(nodeHolder.getMeasureColumnMinData()[i]));
-        dataChunkBuffer.add(CarbonUtil.getByteArray(dataChunk));
-      }
-    }
-    return dataChunkBuffer;
-  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 6dd211a..eff8f0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -53,8 +53,6 @@ import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
@@ -826,15 +824,6 @@ public final class CarbonUtil {
   }
 
   /**
-   * Below method will be used to get the stats of the measure data page
-   */
-  public static MeasurePageStatsVO getMeasurePageStats(
-      List<ValueEncoderMeta> encodeMetaList) {
-    return MeasurePageStatsVO.build(
-        encodeMetaList.toArray(new ValueEncoderMeta[encodeMetaList.size()]));
-  }
-
-  /**
    * Below method will be used to check whether particular encoding is present
    * in the dimension or not
    *
@@ -1350,20 +1339,6 @@ public final class CarbonUtil {
     return thriftByteArray;
   }
 
-  /**
-   * Below method will be used to convert the bytearray to data chunk object
-   *
-   * @param dataChunkBytes datachunk thrift object in bytes
-   * @return data chunk thrift object
-   */
-  public static DataChunk2 readDataChunk(byte[] dataChunkBytes, int offset, int length)
-      throws IOException {
-    return (DataChunk2) read(dataChunkBytes, new ThriftReader.TBaseCreator() {
-      @Override public TBase create() {
-        return new DataChunk2();
-      }
-    }, offset, length);
-  }
 
   public static DataChunk3 readDataChunk3(ByteBuffer dataChunkBuffer, int offset, int length)
       throws IOException {
@@ -1462,6 +1437,38 @@ public final class CarbonUtil {
     return valueEncoderMeta;
   }
 
+  public static byte[] serializeEncodeMetaUsingByteBuffer(ValueEncoderMeta valueEncoderMeta) {
+    ByteBuffer buffer = null;
+    switch (valueEncoderMeta.getType()) {
+      case LONG:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.LONG_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putLong((Long) valueEncoderMeta.getMaxValue());
+        buffer.putLong((Long) valueEncoderMeta.getMinValue());
+        buffer.putLong(0L); // unique value, not used
+        break;
+      case DOUBLE:
+        buffer = ByteBuffer.allocate(
+            (CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE * 3) + CarbonCommonConstants.INT_SIZE_IN_BYTE
+                + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        buffer.putDouble((Double) valueEncoderMeta.getMaxValue());
+        buffer.putDouble((Double) valueEncoderMeta.getMinValue());
+        buffer.putDouble(0d); // unique value, not used
+        break;
+      case DECIMAL:
+        buffer = ByteBuffer.allocate(CarbonCommonConstants.INT_SIZE_IN_BYTE + 3);
+        buffer.putChar(valueEncoderMeta.getTypeInChar());
+        break;
+    }
+    buffer.putInt(0); // decimal point, not used
+    buffer.put(valueEncoderMeta.getDataTypeSelected());
+    buffer.flip();
+    return buffer.array();
+  }
+
   /**
    * Below method will be used to convert indexes in range
    * Indexes=[0,1,2,3,4,5,6,7,8,9]
@@ -1872,5 +1879,47 @@ public final class CarbonUtil {
       CarbonUtil.deleteFoldersAndFiles(dbPath);
     }
   }
+
+  public static byte[] getMaxValueAsBytes(ValueEncoderMeta meta) {
+    ByteBuffer b;
+    switch (meta.getType()) {
+      case LONG:
+        b = ByteBuffer.allocate(8);
+        b.putLong((long) meta.getMaxValue());
+        b.flip();
+        return b.array();
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((double) meta.getMaxValue());
+        b.flip();
+        return b.array();
+      case DECIMAL:
+      case BYTE_ARRAY:
+        return new byte[8];
+      default:
+        throw new IllegalArgumentException("Invalid data type: " + meta.getType());
+    }
+  }
+
+  public static byte[] getMinValueAsBytes(ValueEncoderMeta meta) {
+    ByteBuffer b;
+    switch (meta.getType()) {
+      case LONG:
+        b = ByteBuffer.allocate(8);
+        b.putLong((long) meta.getMinValue());
+        b.flip();
+        return b.array();
+      case DOUBLE:
+        b = ByteBuffer.allocate(8);
+        b.putDouble((double) meta.getMinValue());
+        b.flip();
+        return b.array();
+      case DECIMAL:
+      case BYTE_ARRAY:
+        return new byte[8];
+      default:
+        throw new IllegalArgumentException("Invalid data type: " + meta.getType());
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index 37ae5bb..39b8b3c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -136,25 +136,6 @@ public final class DataTypeUtil {
   }
 
   /**
-   * This method will return the type of measure based on its data type
-   *
-   * @param dataType
-   * @return
-   */
-  public static char getAggType(DataType dataType) {
-    switch (dataType) {
-      case DECIMAL:
-        return CarbonCommonConstants.BIG_DECIMAL_MEASURE;
-      case SHORT:
-      case INT:
-      case LONG:
-        return CarbonCommonConstants.BIG_INT_MEASURE;
-      default:
-        return CarbonCommonConstants.DOUBLE_MEASURE;
-    }
-  }
-
-  /**
    * This method will convert a big decimal value to bytes
    *
    * @param num

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
index 95037b2..4afa9b6 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NodeHolder.java
@@ -19,9 +19,12 @@ package org.apache.carbondata.core.util;
 
 import java.util.BitSet;
 
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
 
 public class NodeHolder {
+  private EncodedTablePage encodedData;
+
   /**
    * keyArray
    */
@@ -94,7 +97,7 @@ public class NodeHolder {
 
   private byte[][] measureColumnMinData;
 
-  private MeasurePageStatsVO stats;
+  private SimpleStatsResult stats;
 
   /**
    * array of rleEncodingForDictDim flag to identify the rleEncodingForDictDim
@@ -418,11 +421,37 @@ public class NodeHolder {
     return this.writeAll;
   }
 
-  public MeasurePageStatsVO getStats() {
+  public SimpleStatsResult getStats() {
     return stats;
   }
 
-  public void setMeasureStats(MeasurePageStatsVO stats) {
+  public void setMeasureStats(SimpleStatsResult stats) {
     this.stats = stats;
   }
+
+  public static byte[][] getKeyArray(EncodedTablePage encodedTablePage) {
+    int numDimensions = encodedTablePage.getNumDimensions();
+    byte[][] keyArray = new byte[numDimensions][];
+    for (int i = 0; i < numDimensions; i++) {
+      keyArray[i] = encodedTablePage.getDimension(i).getEncodedData();
+    }
+    return keyArray;
+  }
+
+  public static byte[][] getDataArray(EncodedTablePage encodedTablePage) {
+    int numMeasures = encodedTablePage.getNumMeasures();
+    byte[][] dataArray = new byte[numMeasures][];
+    for (int i = 0; i < numMeasures; i++) {
+      dataArray[i] = encodedTablePage.getMeasure(i).getEncodedData();
+    }
+    return dataArray;
+  }
+
+  public void setEncodedData(EncodedTablePage encodedData) {
+    this.encodedData = encodedData;
+  }
+
+  public EncodedTablePage getEncodedData() {
+    return encodedData;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
new file mode 100644
index 0000000..d6ecfbc
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+
+/**
+ * This is the utility class for No Dictionary changes.
+ */
+public class NonDictionaryUtil {
+
+  /**
+   * This method will form one single byte [] for all the high card dims.
+   * For example if you need to pack 2 columns c1 and c2 , it stores in following way
+   *  <total_len(short)><offsetLen(short)><offsetLen+c1_len(short)><c1(byte[])><c2(byte[])>
+   * @param byteBufferArr
+   * @return
+   */
+  public static byte[] packByteBufferIntoSingleByteArray(byte[][] byteBufferArr) {
+    // for empty array means there is no data to remove dictionary.
+    if (null == byteBufferArr || byteBufferArr.length == 0) {
+      return null;
+    }
+    int noOfCol = byteBufferArr.length;
+    short toDetermineLengthOfByteArr = 2;
+    short offsetLen = (short) (noOfCol * 2 + toDetermineLengthOfByteArr);
+    int totalBytes = calculateTotalBytes(byteBufferArr) + offsetLen;
+
+    ByteBuffer buffer = ByteBuffer.allocate(totalBytes);
+
+    // write the length of the byte [] as first short
+    buffer.putShort((short) (totalBytes - toDetermineLengthOfByteArr));
+    // writing the offset of the first element.
+    buffer.putShort(offsetLen);
+
+    // prepare index for byte []
+    for (int index = 0; index < byteBufferArr.length - 1; index++) {
+      int noOfBytes = byteBufferArr[index].length;
+
+      buffer.putShort((short) (offsetLen + noOfBytes));
+      offsetLen += noOfBytes;
+    }
+
+    // put actual data.
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      buffer.put(byteBufferArr[index]);
+    }
+    buffer.rewind();
+    return buffer.array();
+
+  }
+
+  /**
+   * To calculate the total bytes in byte Buffer[].
+   *
+   * @param byteBufferArr
+   * @return
+   */
+  private static int calculateTotalBytes(byte[][] byteBufferArr) {
+    int total = 0;
+    for (int index = 0; index < byteBufferArr.length; index++) {
+      total += byteBufferArr[index].length;
+    }
+    return total;
+  }
+
+  /**
+   * Method to get the required Dimension from obj []
+   *
+   * @param index
+   * @param row
+   * @return
+   */
+  public static Integer getDimension(int index, Object[] row) {
+
+    Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+
+    return dimensions[index];
+
+  }
+
+  /**
+   * Method to get the required measure from obj []
+   *
+   * @param index
+   * @param row
+   * @return
+   */
+  public static Object getMeasure(int index, Object[] row) {
+    Object[] measures = (Object[]) row[WriteStepRowUtil.MEASURE];
+    return measures[index];
+  }
+
+  public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
+
+    return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+  }
+
+  public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
+      Object[] measureArray) {
+
+    out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
+    out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
+    out[WriteStepRowUtil.MEASURE] = measureArray;
+
+  }
+
+  /**
+   * This method will extract the single dimension from the complete high card dims byte[].+     *
+   * The format of the byte [] will be,  Totallength,CompleteStartOffsets,Dat
+   *
+   * @param highCardArr
+   * @param index
+   * @param highCardinalityCount
+   * @param outBuffer
+   */
+  public static void extractSingleHighCardDims(byte[] highCardArr, int index,
+      int highCardinalityCount, ByteBuffer outBuffer) {
+    ByteBuffer buff = null;
+    short secIndex = 0;
+    short firstIndex = 0;
+    int length;
+    // if the requested index is a last one then we need to calculate length
+    // based on byte[] length.
+    if (index == highCardinalityCount - 1) {
+      // need to read 2 bytes(1 short) to determine starting offset and
+      // length can be calculated by array length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
+    } else {
+      // need to read 4 bytes(2 short) to determine starting offset and
+      // length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
+    }
+
+    firstIndex = buff.getShort();
+    // if it is a last dimension in high card then this will be last
+    // offset.so calculate length from total length
+    if (index == highCardinalityCount - 1) {
+      secIndex = (short) highCardArr.length;
+    } else {
+      secIndex = buff.getShort();
+    }
+
+    length = secIndex - firstIndex;
+
+    outBuffer.position(firstIndex);
+    outBuffer.limit(outBuffer.position() + length);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
index 56e83db..b953d45 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonMetadataUtilTest.java
@@ -17,34 +17,41 @@
 
 package org.apache.carbondata.core.util;
 
-import mockit.Mock;
-import mockit.MockUp;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.format.*;
+import org.apache.carbondata.core.metadata.index.BlockIndexInfo;
+import org.apache.carbondata.format.BlockIndex;
+import org.apache.carbondata.format.BlockletInfo;
 import org.apache.carbondata.format.BlockletMinMaxIndex;
 import org.apache.carbondata.format.ColumnSchema;
+import org.apache.carbondata.format.DataChunk;
 import org.apache.carbondata.format.DataType;
+import org.apache.carbondata.format.Encoding;
+import org.apache.carbondata.format.FileFooter;
+import org.apache.carbondata.format.IndexHeader;
+import org.apache.carbondata.format.SegmentInfo;
 
+import mockit.Mock;
+import mockit.MockUp;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.BitSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static junit.framework.TestCase.*;
-import static org.apache.carbondata.core.util.CarbonMetadataUtil.getIndexHeader;
+import static junit.framework.TestCase.assertEquals;
 import static org.apache.carbondata.core.util.CarbonMetadataUtil.convertFileFooter;
 import static org.apache.carbondata.core.util.CarbonMetadataUtil.getBlockIndexInfo;
+import static org.apache.carbondata.core.util.CarbonMetadataUtil.getIndexHeader;
 
 public class CarbonMetadataUtilTest {
   static List<ByteBuffer> byteBufferList;
@@ -57,8 +64,6 @@ public class CarbonMetadataUtilTest {
   static int[] objDecimal;
 
   @BeforeClass public static void setUp() {
-    Long lngObj = new Long("11221");
-    byte byt = 1;
     objMaxArr = new Long[6];
     objMaxArr[0] = new Long("111111");
     objMaxArr[1] = new Long("121111");
@@ -113,13 +118,11 @@ public class CarbonMetadataUtilTest {
     blockletInfoList.add(blockletInfo);
     blockletInfoList.add(blockletInfo);
 
-    ValueEncoderMeta valueEncoderMeta = new ValueEncoderMeta();
-    valueEncoderMeta.setDecimal(5);
-    valueEncoderMeta.setMinValue(objMinArr);
-    valueEncoderMeta.setMaxValue(objMaxArr);
-    valueEncoderMeta.setUniqueValue(lngObj);
-    valueEncoderMeta.setType('a');
-    valueEncoderMeta.setDataTypeSelected(byt);
+    ValueEncoderMeta meta = CodecMetaFactory.createMeta();
+    meta.setDecimal(5);
+    meta.setMinValue(objMinArr);
+    meta.setMaxValue(objMaxArr);
+    meta.setType(ColumnPageCodecMeta.DOUBLE_MEASURE);
 
     List<Encoding> encoders = new ArrayList<>();
     encoders.add(Encoding.INVERTED_INDEX);
@@ -199,19 +202,52 @@ public class CarbonMetadataUtilTest {
 
     ValueEncoderMeta[] metas = new ValueEncoderMeta[6];
     for (int i = 0; i < metas.length; i++) {
-      metas[i] = new ValueEncoderMeta();
+      metas[i] = CodecMetaFactory.createMeta();
       metas[i].setMinValue(objMinArr[i]);
       metas[i].setMaxValue(objMaxArr[i]);
-      metas[i].setUniqueValue(objMinArr[i]);
       metas[i].setDecimal(objDecimal[i]);
-      metas[i].setType(CarbonCommonConstants.BIG_INT_MEASURE);
-      metas[i].setDataTypeSelected(byteArr[i]);
+      metas[i].setType(ColumnPageCodecMeta.BIG_INT_MEASURE);
     }
 
-    MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas);
-
     BlockletInfoColumnar blockletInfoColumnar = new BlockletInfoColumnar();
 
+    final ValueEncoderMeta meta = CodecMetaFactory.createMeta();
+
+    new MockUp<ColumnPageCodecMeta>() {
+      @SuppressWarnings("unused") @Mock
+      public byte[] serialize() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public byte[] getMaxAsBytes() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public byte[] getMinAsBytes() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public org.apache.carbondata.core.metadata.datatype.DataType getSrcDataType() {
+        return org.apache.carbondata.core.metadata.datatype.DataType.DOUBLE;
+      }
+    };
+
+    new MockUp<EncodedMeasurePage>() {
+      @SuppressWarnings("unused") @Mock
+      public ValueEncoderMeta getMetaData() {
+        return meta;
+      }
+    };
+
+    final EncodedMeasurePage measure = new EncodedMeasurePage(6, new byte[]{0,1}, meta,
+        new BitSet());
+    new MockUp<EncodedTablePage>() {
+      @SuppressWarnings("unused") @Mock
+      public EncodedMeasurePage getMeasure(int measureIndex) {
+        return measure;
+      }
+    };
+
     BitSet[] bitSetArr = new BitSet[6];
     bitSetArr[0] = new BitSet();
     bitSetArr[1] = new BitSet();
@@ -222,7 +258,6 @@ public class CarbonMetadataUtilTest {
     blockletInfoColumnar.setColumnMaxData(maxByteArr);
     blockletInfoColumnar.setColumnMinData(maxByteArr);
     blockletInfoColumnar.setKeyLengths(intArr);
-    blockletInfoColumnar.setColGrpBlocks(boolArr);
     blockletInfoColumnar.setKeyOffSets(longArr);
     blockletInfoColumnar.setDataIndexMapOffsets(longArr);
     blockletInfoColumnar.setAggKeyBlock(boolArr);
@@ -232,7 +267,8 @@ public class CarbonMetadataUtilTest {
     blockletInfoColumnar.setMeasureLength(intArr);
     blockletInfoColumnar.setMeasureOffset(longArr);
     blockletInfoColumnar.setMeasureNullValueIndex(bitSetArr);
-    blockletInfoColumnar.setStats(stats);
+    EncodedTablePage encodedTablePage = EncodedTablePage.newEmptyInstance();
+    blockletInfoColumnar.setEncodedTablePage(encodedTablePage);
 
     BlockletInfoColumnar blockletInfoColumnar1 = new BlockletInfoColumnar();
     blockletInfoColumnar1.setColumnMaxData(maxByteArr);
@@ -243,13 +279,11 @@ public class CarbonMetadataUtilTest {
     blockletInfoColumnar1.setAggKeyBlock(boolArr);
     blockletInfoColumnar1.setDataIndexMapLength(intArr);
     blockletInfoColumnar1.setIsSortedKeyColumn(boolArr);
-    blockletInfoColumnar1.setColGrpBlocks(boolArr);
     blockletInfoColumnar1.setKeyOffSets(longArr);
     blockletInfoColumnar1.setMeasureLength(intArr);
     blockletInfoColumnar1.setMeasureOffset(longArr);
     blockletInfoColumnar1.setMeasureNullValueIndex(bitSetArr);
-    blockletInfoColumnar1.setStats(stats);
-    blockletInfoColumnar1.setColGrpBlocks(boolArr);
+    blockletInfoColumnar1.setEncodedTablePage(encodedTablePage);
 
     List<BlockletInfoColumnar> blockletInfoColumnarList = new ArrayList<>();
     blockletInfoColumnarList.add(blockletInfoColumnar);
@@ -285,7 +319,7 @@ public class CarbonMetadataUtilTest {
     BlockletMinMaxIndex blockletMinMaxIndex = new BlockletMinMaxIndex();
     blockletMinMaxIndex.addToMax_values(ByteBuffer.wrap(byteMaxArr));
     blockletMinMaxIndex.addToMin_values(ByteBuffer.wrap(byteMinArr));
-    FileFooter result = convertFileFooter(blockletInfoColumnarList, 4, cardinality, columnSchemas,
+    FileFooter result = convertFileFooter(blockletInfoColumnarList, cardinality, columnSchemas,
         segmentProperties);
     assertEquals(result.getTable_columns(), columnSchemas);