You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/28 06:10:02 UTC

[2/7] carbondata git commit: [CARBONDATA-1098] Change page statistics use exact type and use column page in writer

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
index 755308e..f5d7dc7 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/DataTypeUtilTest.java
@@ -50,14 +50,6 @@ public class DataTypeUtilTest {
 
   }
 
-  @Test public void testGetAggType() {
-    assertTrue(getAggType(DataType.DECIMAL) == 'b');
-    assertTrue(getAggType(DataType.INT) == 'd');
-    assertTrue(getAggType(DataType.LONG) == 'd');
-    assertTrue(getAggType(DataType.NULL) == 'n');
-
-  }
-
   @Test public void testBigDecimalToByte() {
     byte[] result = bigDecimalToByte(BigDecimal.ONE);
     assertTrue(result == result);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
index 64651e5..5fc6df9 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonFooterWriterTest.java
@@ -24,21 +24,26 @@ import java.util.BitSet;
 import java.util.List;
 import java.util.UUID;
 
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.core.metadata.ValueEncoderMeta;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
+import org.apache.carbondata.core.metadata.CodecMetaFactory;
+import org.apache.carbondata.core.metadata.ColumnPageCodecMeta;
+import org.apache.carbondata.core.metadata.ValueEncoderMeta;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.reader.CarbonFooterReader;
 import org.apache.carbondata.core.util.CarbonMetadataUtil;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.format.ColumnSchema;
 
 import junit.framework.TestCase;
-import org.apache.carbondata.format.ColumnSchema;
+import mockit.Mock;
+import mockit.MockUp;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -87,9 +92,7 @@ public class CarbonFooterWriterTest extends TestCase{
     int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema);
     SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, colCardinality);
 		writer.writeFooter(CarbonMetadataUtil.convertFileFooter(
-				infoColumnars,
-				6,
-				cardinalities,columnSchema, segmentProperties
+				infoColumnars, cardinalities,columnSchema, segmentProperties
 				), 0);
 
     CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0);
@@ -125,41 +128,7 @@ public class CarbonFooterWriterTest extends TestCase{
     return dimColumn;
  }
 
-  /**
-   * test writing fact metadata.
-   */
-  @Test public void testReadFactMetadata() throws IOException {
-    deleteFile();
-    createFile();
-    CarbonFooterWriter writer = new CarbonFooterWriter(filePath);
-    List<BlockletInfoColumnar> infoColumnars = getBlockletInfoColumnars();
-    int[] cardinalities = new int[] { 2, 4, 5, 7, 9, 10};
-    List<ColumnSchema> columnSchema = Arrays.asList(new ColumnSchema[]{getDimensionColumn("IMEI1"),
-						getDimensionColumn("IMEI2"),
-						getDimensionColumn("IMEI3"),
-						getDimensionColumn("IMEI4"),
-						getDimensionColumn("IMEI5"),
-						getDimensionColumn("IMEI6")});
-    List<org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema> wrapperColumnSchema = Arrays.asList(new org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema[]{getWrapperDimensionColumn("IMEI1"),
-    	getWrapperDimensionColumn("IMEI2"),
-    	getWrapperDimensionColumn("IMEI3"),
-    	getWrapperDimensionColumn("IMEI4"),
-    	getWrapperDimensionColumn("IMEI5"),
-    	getWrapperDimensionColumn("IMEI6")});
-    int[] colCardinality = CarbonUtil.getFormattedCardinality(cardinalities, wrapperColumnSchema);
-    SegmentProperties segmentProperties = new SegmentProperties(wrapperColumnSchema, cardinalities);
-    writer.writeFooter(CarbonMetadataUtil
-        .convertFileFooter(infoColumnars, 6, colCardinality,
-        		columnSchema,segmentProperties), 0);
-
-    CarbonFooterReader metaDataReader = new CarbonFooterReader(filePath, 0);
-    List<BlockletInfoColumnar> nodeInfoColumnars =
-        CarbonMetadataUtil.convertBlockletInfo(metaDataReader.readFooter());
-
-    assertTrue(nodeInfoColumnars.size() == infoColumnars.size());
-  }
-
-  private List<BlockletInfoColumnar> getBlockletInfoColumnars() {
+  private List<BlockletInfoColumnar> getBlockletInfoColumnars() throws IOException {
     BlockletInfoColumnar infoColumnar = new BlockletInfoColumnar();
     infoColumnar.setStartKey(new byte[] { 1, 2, 3 });
     infoColumnar.setEndKey(new byte[] { 8, 9, 10 });
@@ -179,27 +148,47 @@ public class CarbonFooterWriterTest extends TestCase{
     infoColumnar.setMeasureLength(new int[] { 6, 7 });
     infoColumnar.setMeasureOffset(new long[] { 33, 99 });
     infoColumnar.setAggKeyBlock(new boolean[] { true, true, true, true });
-    infoColumnar.setColGrpBlocks(new boolean[] { false, false, false, false });
     infoColumnar.setMeasureNullValueIndex(new BitSet[] {new BitSet(),new BitSet()});
+    infoColumnar.setEncodedTablePage(EncodedTablePage.newEmptyInstance());
+
+    final ValueEncoderMeta meta = CodecMetaFactory.createMeta();
+
+    new MockUp<ColumnPageCodecMeta>() {
+      @SuppressWarnings("unused") @Mock
+      public byte[] serialize() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public byte[] getMaxAsBytes() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public byte[] getMinAsBytes() {
+        return new byte[]{1,2};
+      }
+      @SuppressWarnings("unused") @Mock
+      public DataType getSrcDataType() {
+        return DataType.DOUBLE;
+      }
+
+    };
+
+    new MockUp<EncodedMeasurePage>() {
+      @SuppressWarnings("unused") @Mock
+      public ValueEncoderMeta getMetaData() {
+        return meta;
+      }
+    };
+
+    final EncodedMeasurePage measure = new EncodedMeasurePage(2, new byte[]{0,1}, meta,
+        new BitSet());
+    new MockUp<EncodedTablePage>() {
+      @SuppressWarnings("unused") @Mock
+      public EncodedMeasurePage getMeasure(int measureIndex) {
+        return measure;
+      }
+    };
 
-    ValueEncoderMeta[] metas = new ValueEncoderMeta[2];
-    metas[0] = new ValueEncoderMeta();
-    metas[0].setMinValue(0);
-    metas[0].setMaxValue(44d);
-    metas[0].setUniqueValue(0d);
-    metas[0].setDecimal(0);
-    metas[0].setType(CarbonCommonConstants.DOUBLE_MEASURE);
-    metas[0].setDataTypeSelected((byte)0);
-    metas[1] = new ValueEncoderMeta();
-    metas[1].setMinValue(0);
-    metas[1].setMaxValue(55d);
-    metas[1].setUniqueValue(0d);
-    metas[1].setDecimal(0);
-    metas[1].setType(CarbonCommonConstants.DOUBLE_MEASURE);
-    metas[1].setDataTypeSelected((byte)0);
-
-    MeasurePageStatsVO stats = MeasurePageStatsVO.build(metas);
-    infoColumnar.setStats(stats);
     List<BlockletInfoColumnar> infoColumnars = new ArrayList<BlockletInfoColumnar>();
     infoColumnars.add(infoColumnar);
     return infoColumnars;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/format/src/main/thrift/carbondata.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/carbondata.thrift b/format/src/main/thrift/carbondata.thrift
index b4cbc4e..8acd0b1 100644
--- a/format/src/main/thrift/carbondata.thrift
+++ b/format/src/main/thrift/carbondata.thrift
@@ -119,7 +119,7 @@ struct DataChunk{
  * in Row Major format.
  *
  * For V3, one data chunk is one page data of 32K rows.
- * For V2 & V1, one data chunk is one blocklet data.
+ * For V2, one data chunk is one blocklet data.
  */
 struct DataChunk2{
     1: required ChunkCompressionMeta chunk_meta; // The metadata of a chunk

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
index 2980ad3..3ca8cf1 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -116,8 +116,8 @@ class CarbonHiveSerDe extends AbstractSerDe {
   @Override public Writable serialize(Object obj, ObjectInspector objectInspector)
       throws SerDeException {
     if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) {
-      throw new SerDeException("Cannot serialize " + objInspector.getCategory()
-          + ". Can only serialize a struct");
+      throw new SerDeException("Cannot serializeStartKey " + objInspector.getCategory()
+          + ". Can only serializeStartKey a struct");
     }
     serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size();
     status = LAST_OPERATION.SERIALIZE;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
index be17823..757a342 100644
--- a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
+++ b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
@@ -87,7 +87,7 @@
 //    assertEquals("deserialization gives the wrong object", t, row);
 //
 //    // Serialize
-//    final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
+//    final ArrayWritable serializedArr = (ArrayWritable) serDe.serializeStartKey(row, oi);
 //    assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(),
 //        serializedArr.get().length);
 //    assertTrue("serialized object should be equal to starting object",

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
index 8c90d27..db4bca9 100644
--- a/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
+++ b/integration/spark-common-test/src/test/resources/complexTypeDecimal.csv
@@ -1,9 +1,9 @@
 ID,date,country,name,phonetype,serialname,salary,complex
-1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33
-2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33
-4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33
-5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33
-6.5,2015/7/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33
-8,2015/7/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33
-9.1,2015/7/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33
-10,2015/7/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33
\ No newline at end of file
+1.2,2015/07/23,china,aaa1,phone197,ASD69643,15000,3.113$3.33
+2,2015/07/24,china,aaa2,phone756,ASD42892,15001,3.123$7.33
+4.3,2015/07/26,china,aaa4,phone2435,ASD66902,15003,3.123$56.33
+5,2015/07/27,china,aaa5,phone2441,ASD90633,15004,3.133$5.33
+6.5,2015/07/28,china,aaa6,phone294,ASD59961,15005,3.133$54.33
+8,2015/07/30,china,aaa8,phone1848,ASD57308,15007,32.13$56.33
+9.1,2015/07/18,china,aaa9,phone706,ASD86717,15008,3.213$44.33
+10,2015/07/19,usa,aaa10,phone685,ASD30505,15009,32.13$33.33
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
index ae67723..e8c023b 100644
--- a/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
+++ b/integration/spark-common-test/src/test/resources/complexTypeDecimalNestedHive.csv
@@ -1,4 +1,4 @@
-1.2,2015/7/23,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi
+1.2,2015-7-23 00:00:00,china,aaa1,phone197,ASD69643,15000,3.113:imei$3.33:imsi
 2,2015/7/24,china,aaa2,phone756,ASD42892,15001,3.123:imei$7.33:imsi
 4.3,2015/7/26,china,aaa4,phone2435,ASD66902,15003,3.123:imei$56.33:imsi
 5,2015/7/27,china,aaa5,phone2441,ASD90633,15004,3.133:imei$5.33:imsi

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java b/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
deleted file mode 100644
index 70ac3d9..0000000
--- a/processing/src/main/java/org/apache/carbondata/core/datastore/columnar/ColGroupBlockStorage.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.datastore.columnar;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.processing.store.colgroup.ColGroupDataHolder;
-import org.apache.carbondata.processing.store.colgroup.ColGroupMinMax;
-
-/**
- * it is holder of column group dataPage and also min max for colgroup block dataPage
- */
-public class ColGroupBlockStorage implements IndexStorage, Callable<IndexStorage> {
-
-  private byte[][] dataPage;
-
-  private ColGroupMinMax colGrpMinMax;
-
-  public ColGroupBlockStorage(SegmentProperties segmentProperties, int colGrpIndex,
-      byte[][] dataPage) {
-    colGrpMinMax = new ColGroupMinMax(segmentProperties, colGrpIndex);
-    this.dataPage = dataPage;
-    for (int i = 0; i < dataPage.length; i++) {
-      colGrpMinMax.add(dataPage[i]);
-    }
-  }
-
-  /**
-   * sorting is not required for colgroup storage and hence return true
-   */
-  @Override public boolean isAlreadySorted() {
-    return true;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  public ColGroupDataHolder getRowIdPage() {
-    //not required for column group storage
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  public ColGroupDataHolder getRowIdRlePage() {
-    // not required for column group storage
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  public byte[][] getDataPage() {
-    return dataPage;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  public ColGroupDataHolder getDataRlePage() {
-    //not required for column group
-    return null;
-  }
-
-  /**
-   * for column group storage its not required
-   */
-  @Override public int getTotalSize() {
-    return dataPage.length;
-  }
-
-  @Override public byte[] getMin() {
-    return colGrpMinMax.getMin();
-  }
-
-  @Override public byte[] getMax() {
-    return colGrpMinMax.getMax();
-  }
-
-  /**
-   * return self
-   */
-  @Override public IndexStorage call() throws Exception {
-    return this;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
index 50fb4c5..1bcbe54 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/SortStepRowUtil.java
@@ -17,8 +17,12 @@
 
 package org.apache.carbondata.processing.newflow.sort;
 
+import java.math.BigDecimal;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class SortStepRowUtil {
   public static Object[] convertRow(Object[] data, SortParameters parameters,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index 912cd3a..aad874b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -177,7 +177,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
         readCounter++;
       }
     } catch (Exception e) {
-      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
+      throw new CarbonDataLoadingException(e);
     }
     rowCounter.getAndAdd(batch.getSize());
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 6528d44..653da7b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -31,8 +31,8 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class IntermediateFileMerger implements Callable<Void> {
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
index bc6640d..11c42a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparator.java
@@ -22,7 +22,7 @@ import java.util.Comparator;
 
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 
 public class RowComparator implements Comparator<Object[]> {
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
index 8d914ea..be29bf8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/RowComparatorForNormalDims.java
@@ -18,7 +18,7 @@ package org.apache.carbondata.processing.sortandgroupby.sortdata;
 
 import java.util.Comparator;
 
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 
 /**
  * This class is used as comparator for comparing dims which are non high cardinality dims.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
index 1bcbaa8..b17c69a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/SortTempFileChunkHolder.java
@@ -35,9 +35,8 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class SortTempFileChunkHolder implements Comparable<SortTempFileChunkHolder> {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
index d629cdc..51b3964 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sortandgroupby/sortdata/UnCompressedTempSortFileWriter.java
@@ -23,8 +23,8 @@ import java.io.IOException;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
 
 public class UnCompressedTempSortFileWriter extends AbstractTempSortFileWriter {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 3ed5888..6ed5d31 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -51,7 +51,6 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.NodeHolder;
 import org.apache.carbondata.processing.newflow.sort.SortScopeOptions;
 import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
@@ -148,8 +147,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private ColumnarFormatVersion version;
 
-  private TablePageEncoder encoder;
-
   private SortScopeOptions.SortScope sortScope;
 
   /**
@@ -201,7 +198,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
     }
     this.version = CarbonProperties.getInstance().getFormatVersion();
-    this.encoder = new TablePageEncoder(model);
     String noInvertedIdxCol = "";
     for (CarbonDimension cd : model.getSegmentProperties().getDimensions()) {
       if (!cd.isUseInvertedIndex()) {
@@ -343,35 +339,26 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
   }
 
   /**
-   * generate the NodeHolder from the input rows (one page in case of V3 format)
+   * generate the EncodedTablePage from the input rows (one page in case of V3 format)
    */
-  private NodeHolder processDataRows(List<CarbonRow> dataRows)
+  private EncodedTablePage processDataRows(List<CarbonRow> dataRows)
       throws CarbonDataWriterException, KeyGenException, MemoryException, IOException {
     if (dataRows.size() == 0) {
-      return new NodeHolder();
+      return EncodedTablePage.newEmptyInstance();
     }
     TablePage tablePage = new TablePage(model, dataRows.size());
-    TablePageKey keys = new TablePageKey(model, dataRows.size());
     int rowId = 0;
 
     // convert row to columnar data
     for (CarbonRow row : dataRows) {
-      tablePage.addRow(rowId, row);
-      keys.update(rowId, row);
-      rowId++;
+      tablePage.addRow(rowId++, row);
     }
 
-    // apply and compress dimensions and measure
-    EncodedData encodedData = encoder.encode(tablePage);
-
-    TablePageStatistics tablePageStatistics = new TablePageStatistics(
-        model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());
-
-    NodeHolder nodeHolder = dataWriter.buildDataNodeHolder(encodedData, tablePageStatistics, keys);
+    EncodedTablePage encoded = tablePage.encode();
     tablePage.freeMemory();
 
     LOGGER.info("Number Of records processed: " + dataRows.size());
-    return nodeHolder;
+    return encoded;
   }
 
   /**
@@ -470,7 +457,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       }
       consumerExecutorService.shutdownNow();
       processWriteTaskSubmitList(consumerExecutorServiceTaskList);
-      this.dataWriter.writeBlockletInfoToFile();
+      this.dataWriter.writeFooterToFile();
       LOGGER.info("All blocklets have been finished writing");
       // close all the open stream for both the files
       this.dataWriter.closeWriter();
@@ -666,7 +653,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     /**
      * array of blocklet data holder objects
      */
-    private NodeHolder[] nodeHolders;
+    private EncodedTablePage[] encodedTablePages;
     /**
      * flag to check whether the producer has completed processing for holder
      * object which is required to be picked form an index
@@ -678,7 +665,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     private int currentIndex;
 
     private BlockletDataHolder() {
-      nodeHolders = new NodeHolder[numberOfCores];
+      encodedTablePages = new EncodedTablePage[numberOfCores];
       available = new AtomicBoolean(false);
     }
 
@@ -686,32 +673,32 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      * @return a node holder object
      * @throws InterruptedException if consumer thread is interrupted
      */
-    public synchronized NodeHolder get() throws InterruptedException {
-      NodeHolder nodeHolder = nodeHolders[currentIndex];
+    public synchronized EncodedTablePage get() throws InterruptedException {
+      EncodedTablePage encodedTablePage = encodedTablePages[currentIndex];
       // if node holder is null means producer thread processing the data which has to
       // be inserted at this current index has not completed yet
-      if (null == nodeHolder && !processingComplete) {
+      if (null == encodedTablePage && !processingComplete) {
         available.set(false);
       }
       while (!available.get()) {
         wait();
       }
-      nodeHolder = nodeHolders[currentIndex];
-      nodeHolders[currentIndex] = null;
+      encodedTablePage = encodedTablePages[currentIndex];
+      encodedTablePages[currentIndex] = null;
       currentIndex++;
       // reset current index when it reaches length of node holder array
-      if (currentIndex >= nodeHolders.length) {
+      if (currentIndex >= encodedTablePages.length) {
         currentIndex = 0;
       }
-      return nodeHolder;
+      return encodedTablePage;
     }
 
     /**
-     * @param nodeHolder
+     * @param encodedTablePage
      * @param index
      */
-    public synchronized void put(NodeHolder nodeHolder, int index) {
-      nodeHolders[index] = nodeHolder;
+    public synchronized void put(EncodedTablePage encodedTablePage, int index) {
+      encodedTablePages[index] = encodedTablePage;
       // notify the consumer thread when index at which object is to be inserted
       // becomes equal to current index from where data has to be picked for writing
       if (index == currentIndex) {
@@ -729,14 +716,14 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     private BlockletDataHolder blockletDataHolder;
     private List<CarbonRow> dataRows;
     private int sequenceNumber;
-    private boolean isWriteAll;
+    private boolean isLastPage;
 
     private Producer(BlockletDataHolder blockletDataHolder, List<CarbonRow> dataRows,
-        int sequenceNumber, boolean isWriteAll) {
+        int sequenceNumber, boolean isLastPage) {
       this.blockletDataHolder = blockletDataHolder;
       this.dataRows = dataRows;
       this.sequenceNumber = sequenceNumber;
-      this.isWriteAll = isWriteAll;
+      this.isLastPage = isLastPage;
     }
 
     /**
@@ -747,11 +734,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       try {
-        NodeHolder nodeHolder = processDataRows(dataRows);
-        nodeHolder.setWriteAll(isWriteAll);
+        EncodedTablePage encodedTablePage = processDataRows(dataRows);
+        encodedTablePage.setIsLastPage(isLastPage);
         // insert the object in array according to sequence number
         int indexInNodeHolderArray = (sequenceNumber - 1) % numberOfCores;
-        blockletDataHolder.put(nodeHolder, indexInNodeHolderArray);
+        blockletDataHolder.put(encodedTablePage, indexInNodeHolderArray);
         return null;
       } catch (Throwable throwable) {
         LOGGER.error(throwable, "Error in producer");
@@ -781,11 +768,11 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
      */
     @Override public Void call() throws Exception {
       while (!processingComplete || blockletProcessingCount.get() > 0) {
-        NodeHolder nodeHolder = null;
+        EncodedTablePage encodedTablePage = null;
         try {
-          nodeHolder = blockletDataHolder.get();
-          if (null != nodeHolder) {
-            dataWriter.writeBlockletData(nodeHolder);
+          encodedTablePage = blockletDataHolder.get();
+          if (null != encodedTablePage) {
+            dataWriter.writeTablePage(encodedTablePage);
           }
           blockletProcessingCount.decrementAndGet();
         } catch (Throwable throwable) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 7930763..c5a9bec 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -22,23 +22,45 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 
+import org.apache.carbondata.core.datastore.DimensionType;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedDimensionPage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedMeasurePage;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.datastore.page.key.TablePageKey;
+import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.SimpleStatsResult;
+import org.apache.carbondata.core.datastore.page.statistics.VarLengthPageStatsCollector;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
 import org.apache.spark.sql.types.Decimal;
 
-
 /**
  * Represent a page data for all columns, we store its data in columnar layout, so that
  * all processing apply to TablePage can be done in vectorized fashion.
@@ -55,24 +77,30 @@ public class TablePage {
   private ComplexColumnPage[] complexDimensionPage;
   private ColumnPage[] measurePage;
 
-  private MeasurePageStatsVO measurePageStatistics;
-
   // the num of rows in this page, it must be less than short value (65536)
   private int pageSize;
 
   private CarbonFactDataHandlerModel model;
 
+  private TablePageKey key;
+
+  private ColumnarFormatVersion version = CarbonProperties.getInstance().getFormatVersion();
+
   TablePage(CarbonFactDataHandlerModel model, int pageSize) throws MemoryException {
     this.model = model;
     this.pageSize = pageSize;
     int numDictDimension = model.getMDKeyGenerator().getDimCount();
     dictDimensionPage = new ColumnPage[numDictDimension];
     for (int i = 0; i < dictDimensionPage.length; i++) {
-      dictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+      ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+      page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
+      dictDimensionPage[i] = page;
     }
     noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPage.length; i++) {
-      noDictDimensionPage[i] = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+      ColumnPage page = ColumnPage.newVarLengthPage(DataType.BYTE_ARRAY, pageSize);
+      page.setStatsCollector(VarLengthPageStatsCollector.newInstance());
+      noDictDimensionPage[i] = page;
     }
     complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
     for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -84,22 +112,33 @@ public class TablePage {
     DataType[] dataTypes = model.getMeasureDataType();
     TableSpec.MeasureSpec measureSpec = model.getTableSpec().getMeasureSpec();
     for (int i = 0; i < measurePage.length; i++) {
-      measurePage[i] = ColumnPage
+      ColumnPage page = ColumnPage
           .newPage(dataTypes[i], pageSize, measureSpec.getScale(i), measureSpec.getPrecision(i));
+      page.setStatsCollector(PrimitivePageStatsCollector.newInstance(dataTypes[i], pageSize));
+      measurePage[i] = page;
     }
+    boolean hasNoDictionary = noDictDimensionPage.length > 0;
+    this.key = new TablePageKey(pageSize, model.getMDKeyGenerator(), model.getSegmentProperties(),
+        hasNoDictionary);
   }
 
   /**
-   * Add one row to the internal store, it will be converted into columnar layout
+   * Add one row to the internal store
    *
    * @param rowId Id of the input row
    * @param row   row object
    */
   public void addRow(int rowId, CarbonRow row) throws KeyGenException {
-    // convert each column category
+    // convert each column category, update key and stats
+    byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
+    convertToColumnarAndAddToPages(rowId, row, mdk);
+    key.update(rowId, row, mdk);
+  }
 
+  // convert the input row object to columnar data and add to column pages
+  private void convertToColumnarAndAddToPages(int rowId, CarbonRow row, byte[] mdk)
+      throws KeyGenException {
     // 1. convert dictionary columns
-    byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
     byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
     for (int i = 0; i < dictDimensionPage.length; i++) {
       dictDimensionPage[i].putData(rowId, keys[i]);
@@ -137,11 +176,6 @@ public class TablePage {
       }
       measurePage[i].putData(rowId, value);
     }
-
-    // update statistics if it is last row
-    if (rowId + 1 == pageSize) {
-      this.measurePageStatistics = new MeasurePageStatsVO(measurePage);
-    }
   }
 
   /**
@@ -160,10 +194,10 @@ public class TablePage {
     // initialize the page if first row
     if (rowId == 0) {
       int depthInComplexColumn = complexDataType.getColsCount();
-      getComplexDimensionPage()[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
+      complexDimensionPage[index] = new ComplexColumnPage(pageSize, depthInComplexColumn);
     }
 
-    int depthInComplexColumn = getComplexDimensionPage()[index].getDepth();
+    int depthInComplexColumn = complexDimensionPage[index].getDepth();
     // this is the result columnar data which will be added to page,
     // size of this list is the depth of complex column, we will fill it by input data
     List<ArrayList<byte[]>> encodedComplexColumnar = new ArrayList<>();
@@ -187,8 +221,7 @@ public class TablePage {
     }
 
     for (int depth = 0; depth < depthInComplexColumn; depth++) {
-      getComplexDimensionPage()[index]
-          .putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
+      complexDimensionPage[index].putComplexData(rowId, depth, encodedComplexColumnar.get(depth));
     }
   }
 
@@ -217,26 +250,161 @@ public class TablePage {
     return output;
   }
 
-  ColumnPage[] getDictDimensionPage() {
-    return dictDimensionPage;
+  EncodedTablePage encode() throws KeyGenException, MemoryException, IOException {
+    // encode dimensions and measure
+    EncodedDimensionPage[] dimensions = encodeAndCompressDimensions();
+    EncodedMeasurePage[] measures = encodeAndCompressMeasures();
+    return EncodedTablePage.newInstance(pageSize, dimensions, measures, key);
   }
 
-  ColumnPage[] getNoDictDimensionPage() {
-    return noDictDimensionPage;
+  private EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+  // apply measure and set encodedData in `encodedData`
+  private EncodedMeasurePage[] encodeAndCompressMeasures()
+      throws MemoryException, IOException {
+    EncodedMeasurePage[] encodedMeasures = new EncodedMeasurePage[measurePage.length];
+    for (int i = 0; i < measurePage.length; i++) {
+      SimpleStatsResult stats = (SimpleStatsResult)(measurePage[i].getStatistics());
+      ColumnPageCodec encoder = encodingStrategy.createCodec(stats);
+      encodedMeasures[i] = (EncodedMeasurePage) encoder.encode(measurePage[i]);
+    }
+    return encodedMeasures;
+  }
+
+  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
   }
 
-  ComplexColumnPage[] getComplexDimensionPage() {
-    return complexDimensionPage;
+  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
   }
 
-  ColumnPage[] getMeasurePage() {
-    return measurePage;
+  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+    if (version == ColumnarFormatVersion.V3) {
+      return new BlockIndexerStorageForShort(data, false, false, false);
+    } else {
+      return new BlockIndexerStorageForInt(data, false, false, false);
+    }
   }
 
-  MeasurePageStatsVO getMeasureStats() {
-    return measurePageStatistics;
+  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex, boolean isRleApplicable) {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
+      }
+    }
   }
 
+  // apply and compress each dimension, set encoded data in `encodedData`
+  private EncodedDimensionPage[] encodeAndCompressDimensions()
+      throws KeyGenException {
+    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+    int dictionaryColumnCount = -1;
+    int noDictionaryColumnCount = -1;
+    int indexStorageOffset = 0;
+    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    EncodedDimensionPage[] compressedColumns = new EncodedDimensionPage[indexStorages.length];
+    boolean[] isUseInvertedIndex = model.getIsUseInvertedIndex();
+    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+      ColumnPage page;
+      byte[] flattened;
+      boolean isSortColumn = model.isSortColumn(i);
+      switch (dimensionSpec.getType(i)) {
+        case GLOBAL_DICTIONARY:
+          // dictionary dimension
+          page = dictDimensionPage[++dictionaryColumnCount];
+          indexStorages[indexStorageOffset] = encodeAndCompressDictDimension(
+              page.getByteArrayPage(),
+              isSortColumn,
+              isUseInvertedIndex[i] & isSortColumn,
+              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+          break;
+        case DIRECT_DICTIONARY:
+          // timestamp and date column
+          page = dictDimensionPage[++dictionaryColumnCount];
+          indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension(
+              page.getByteArrayPage(),
+              isSortColumn,
+              isUseInvertedIndex[i] & isSortColumn,
+              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+          break;
+        case PLAIN_VALUE:
+          // high cardinality dimension, encoded as plain string
+          page = noDictDimensionPage[++noDictionaryColumnCount];
+          indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension(
+              page.getByteArrayPage(),
+              isSortColumn,
+              isUseInvertedIndex[i] & isSortColumn,
+              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
+          break;
+        case COMPLEX:
+          // we need to add complex column at last, so skipping it here
+          continue;
+        default:
+          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+      }
+      byte[] compressedData = compressor.compressByte(flattened);
+      compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
+          pageSize, compressedData, indexStorages[indexStorageOffset], dimensionSpec.getType(i));
+      SimpleStatsResult stats = (SimpleStatsResult) page.getStatistics();
+      compressedColumns[indexStorageOffset].setNullBitSet(stats.getNullBits());
+      indexStorageOffset++;
+    }
+
+    // handle complex type column
+    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+      Iterator<byte[][]> iterator = complexDimensionPage[i].iterator();
+      while (iterator.hasNext()) {
+        byte[][] data = iterator.next();
+        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+        byte[] flattened = ByteUtil.flatten(data);
+        byte[] compressedData = compressor.compressByte(flattened);
+        compressedColumns[indexStorageOffset] = new EncodedDimensionPage(
+            pageSize, compressedData, indexStorages[indexStorageOffset], DimensionType.COMPLEX);
+        indexStorageOffset++;
+      }
+    }
+    return compressedColumns;
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
deleted file mode 100644
index 8547845..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
-import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.memory.MemoryException;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-class TablePageEncoder {
-
-  private ColumnarFormatVersion version;
-
-  private boolean[] isUseInvertedIndex;
-
-  private CarbonFactDataHandlerModel model;
-
-  private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
-
-  TablePageEncoder(CarbonFactDataHandlerModel model) {
-    this.version = CarbonProperties.getInstance().getFormatVersion();
-    this.model = model;
-    this.isUseInvertedIndex = model.getIsUseInvertedIndex();
-  }
-
-  // function to apply all columns in one table page
-  EncodedData encode(TablePage tablePage)
-      throws KeyGenException, MemoryException, IOException {
-    EncodedData encodedData = new EncodedData();
-    encodeAndCompressDimensions(tablePage, encodedData);
-    encodeAndCompressMeasures(tablePage, encodedData);
-    return encodedData;
-  }
-
-  // apply measure and set encodedData in `encodedData`
-  private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData)
-      throws MemoryException, IOException {
-    ColumnPage[] measurePage = tablePage.getMeasurePage();
-    byte[][] encodedMeasures = new byte[measurePage.length][];
-    for (int i = 0; i < measurePage.length; i++) {
-      ColumnPageCodec encoder = encodingStrategy.createCodec(measurePage[i].getStatistics());
-      encodedMeasures[i] = encoder.encode(measurePage[i]);
-    }
-    encodedData.measures = encodedMeasures;
-  }
-
-  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) throws KeyGenException {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
-    if (version == ColumnarFormatVersion.V3) {
-      return new BlockIndexerStorageForShort(data, false, false, false);
-    } else {
-      return new BlockIndexerStorageForInt(data, false, false, false);
-    }
-  }
-
-  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex, boolean isRleApplicable) {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, isRleApplicable, true, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, isRleApplicable, true, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndexForInt(data);
-      }
-    }
-  }
-
-  // apply and compress each dimension, set encoded data in `encodedData`
-  private void encodeAndCompressDimensions(TablePage tablePage, EncodedData encodedData)
-      throws KeyGenException {
-    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    int indexStorageOffset = 0;
-    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    byte[][] compressedColumns = new byte[indexStorages.length][];
-    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
-      byte[] flattened;
-      boolean isSortColumn = model.isSortColumn(i);
-      switch (dimensionSpec.getType(i)) {
-        case GLOBAL_DICTIONARY:
-          // dictionary dimension
-          indexStorages[indexStorageOffset] = encodeAndCompressDictDimension(
-              tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
-              isSortColumn, isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
-          break;
-        case DIRECT_DICTIONARY:
-          // timestamp and date column
-          indexStorages[indexStorageOffset] = encodeAndCompressDirectDictDimension(
-              tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
-              isSortColumn, isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
-          break;
-        case PLAIN_VALUE:
-          // high cardinality dimension, encoded as plain string
-          indexStorages[indexStorageOffset] = encodeAndCompressNoDictDimension(
-              tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
-              isSortColumn, isUseInvertedIndex[i] & isSortColumn,
-              CarbonDataProcessorUtil.isRleApplicableForColumn(dimensionSpec.getType(i)));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
-          break;
-        case COMPLEX:
-          // we need to add complex column at last, so skipping it here
-          continue;
-        default:
-          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
-      }
-      compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
-      indexStorageOffset++;
-    }
-
-    // handle complex type column
-    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
-      Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
-      while (iterator.hasNext()) {
-        byte[][] data = iterator.next();
-        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
-        byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getDataPage());
-        compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
-        indexStorageOffset++;
-      }
-    }
-
-    encodedData.indexStorages = indexStorages;
-    encodedData.dimensions = compressedColumns;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
deleted file mode 100644
index a66575e..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageKey.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.processing.util.NonDictionaryUtil;
-
-public class TablePageKey {
-  private int pageSize;
-
-  private byte[][] currentNoDictionaryKey;
-
-  // MDK start key
-  private byte[] startKey;
-
-  // MDK end key
-  private byte[] endKey;
-
-  // startkey for no dictionary columns
-  private byte[][] noDictStartKey;
-
-  // endkey for no diciotn
-  private byte[][] noDictEndKey;
-
-  // startkey for no dictionary columns after packing into one column
-  private byte[] packedNoDictStartKey;
-
-  // endkey for no dictionary columns after packing into one column
-  private byte[] packedNoDictEndKey;
-
-  private CarbonFactDataHandlerModel model;
-
-  TablePageKey(CarbonFactDataHandlerModel model, int pageSize) {
-    this.model = model;
-    this.pageSize = pageSize;
-  }
-
-  /** update all keys based on the input row */
-  void update(int rowId, CarbonRow row) throws KeyGenException {
-    if (model.getNoDictionaryCount() > 0 || model.getComplexIndexMap().size() > 0) {
-      currentNoDictionaryKey = WriteStepRowUtil.getNoDictAndComplexDimension(row);
-    }
-    if (rowId == 0) {
-      startKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
-      noDictStartKey = currentNoDictionaryKey;
-    }
-    noDictEndKey = currentNoDictionaryKey;
-    if (rowId == pageSize - 1) {
-      endKey = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
-      finalizeKeys();
-    }
-  }
-
-  /** update all keys if SORT_COLUMNS option is used when creating table */
-  private void finalizeKeys() {
-    // If SORT_COLUMNS is used, may need to update start/end keys since the they may
-    // contains dictionary columns that are not in SORT_COLUMNS, which need to be removed from
-    // start/end key
-    int numberOfDictSortColumns = model.getSegmentProperties().getNumberOfDictSortColumns();
-    if (numberOfDictSortColumns > 0) {
-      // if SORT_COLUMNS contain dictionary columns
-      int[] keySize = model.getSegmentProperties().getFixedLengthKeySplitter().getBlockKeySize();
-      if (keySize.length > numberOfDictSortColumns) {
-        // if there are some dictionary columns that are not in SORT_COLUMNS, it will come to here
-        int newMdkLength = 0;
-        for (int i = 0; i < numberOfDictSortColumns; i++) {
-          newMdkLength += keySize[i];
-        }
-        byte[] newStartKeyOfSortKey = new byte[newMdkLength];
-        byte[] newEndKeyOfSortKey = new byte[newMdkLength];
-        System.arraycopy(startKey, 0, newStartKeyOfSortKey, 0, newMdkLength);
-        System.arraycopy(endKey, 0, newEndKeyOfSortKey, 0, newMdkLength);
-        startKey = newStartKeyOfSortKey;
-        endKey = newEndKeyOfSortKey;
-      }
-    } else {
-      startKey = new byte[0];
-      endKey = new byte[0];
-    }
-
-    // Do the same update for noDictionary start/end Key
-    int numberOfNoDictSortColumns = model.getSegmentProperties().getNumberOfNoDictSortColumns();
-    if (numberOfNoDictSortColumns > 0) {
-      // if sort_columns contain no-dictionary columns
-      if (noDictStartKey.length > numberOfNoDictSortColumns) {
-        byte[][] newNoDictionaryStartKey = new byte[numberOfNoDictSortColumns][];
-        byte[][] newNoDictionaryEndKey = new byte[numberOfNoDictSortColumns][];
-        System.arraycopy(
-            noDictStartKey, 0, newNoDictionaryStartKey, 0, numberOfNoDictSortColumns);
-        System.arraycopy(
-            noDictEndKey, 0, newNoDictionaryEndKey, 0, numberOfNoDictSortColumns);
-        noDictStartKey = newNoDictionaryStartKey;
-        noDictEndKey = newNoDictionaryEndKey;
-      }
-      packedNoDictStartKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictStartKey);
-      packedNoDictEndKey =
-          NonDictionaryUtil.packByteBufferIntoSingleByteArray(noDictEndKey);
-    }
-  }
-
-  public byte[] getStartKey() {
-    return startKey;
-  }
-
-  public byte[] getEndKey() {
-    return endKey;
-  }
-
-  public byte[] getNoDictStartKey() {
-    return packedNoDictStartKey;
-  }
-
-  public byte[] getNoDictEndKey() {
-    return packedNoDictEndKey;
-  }
-
-  public int getPageSize() {
-    return pageSize;
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
deleted file mode 100644
index 13eaac9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
+++ /dev/null
@@ -1,142 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.nio.ByteBuffer;
-import java.util.BitSet;
-
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
-import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-
-// Statistics of dimension and measure column in a TablePage
-public class TablePageStatistics {
-
-  // number of dimension after complex column expanded
-  private int numDimensionsExpanded;
-
-  // min of each dimension column
-  private byte[][] dimensionMinValue;
-
-  // max of each dimension column
-  private byte[][] dimensionMaxValue;
-
-  // min of each measure column
-  private byte[][] measureMinValue;
-
-  // max os each measure column
-  private byte[][] measureMaxValue;
-
-  // null bit set for each measure column
-  private BitSet[] nullBitSet;
-
-  // measure stats
-  // TODO: there are redundant stats
-  private MeasurePageStatsVO measurePageStatistics;
-
-  private TableSpec tableSpec;
-
-  TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
-      EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
-    this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
-    int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
-    this.dimensionMinValue = new byte[numDimensionsExpanded][];
-    this.dimensionMaxValue = new byte[numDimensionsExpanded][];
-    this.measureMinValue = new byte[numMeasures][];
-    this.measureMaxValue = new byte[numMeasures][];
-    this.nullBitSet = new BitSet[numMeasures];
-    this.tableSpec = tableSpec;
-    this.measurePageStatistics = measurePageStatistics;
-    updateMinMax(tablePage, encodedData);
-    updateNullBitSet(tablePage);
-  }
-
-  private void updateMinMax(TablePage tablePage, EncodedData encodedData) {
-    IndexStorage[] keyStorageArray = encodedData.indexStorages;
-    byte[][] measureArray = encodedData.measures;
-
-    for (int i = 0; i < numDimensionsExpanded; i++) {
-      switch (tableSpec.getDimensionSpec().getType(i)) {
-        case GLOBAL_DICTIONARY:
-        case DIRECT_DICTIONARY:
-        case COLUMN_GROUP:
-        case COMPLEX:
-          dimensionMinValue[i] = keyStorageArray[i].getMin();
-          dimensionMaxValue[i] = keyStorageArray[i].getMax();
-          break;
-        case PLAIN_VALUE:
-          dimensionMinValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMin());
-          dimensionMaxValue[i] = updateMinMaxForNoDictionary(keyStorageArray[i].getMax());
-          break;
-      }
-    }
-    for (int i = 0; i < measureArray.length; i++) {
-      ColumnPageStatsVO stats = tablePage.getMeasurePage()[i].getStatistics();
-      measureMaxValue[i] = stats.minBytes();
-      measureMinValue[i] = stats.maxBytes();
-    }
-  }
-
-  private void updateNullBitSet(TablePage tablePage) {
-    nullBitSet = new BitSet[tablePage.getMeasurePage().length];
-    ColumnPage[] measurePages = tablePage.getMeasurePage();
-    for (int i = 0; i < nullBitSet.length; i++) {
-      nullBitSet[i] = measurePages[i].getNullBitSet();
-    }
-  }
-
-  /**
-   * Below method will be used to update the min or max value
-   * by removing the length from it
-   *
-   * @return min max value without length
-   */
-  private byte[] updateMinMaxForNoDictionary(byte[] valueWithLength) {
-    ByteBuffer buffer = ByteBuffer.wrap(valueWithLength);
-    byte[] actualValue = new byte[buffer.getShort()];
-    buffer.get(actualValue);
-    return actualValue;
-  }
-
-  public byte[][] getDimensionMinValue() {
-    return dimensionMinValue;
-  }
-
-  public byte[][] getDimensionMaxValue() {
-    return dimensionMaxValue;
-  }
-
-  public byte[][] getMeasureMinValue() {
-    return measureMinValue;
-  }
-
-  public byte[][] getMeasureMaxValue() {
-    return measureMaxValue;
-  }
-
-  public BitSet[] getNullBitSet() {
-    return nullBitSet;
-  }
-
-  public MeasurePageStatsVO getMeasurePageStatistics() {
-    return measurePageStatistics;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 3756273..b83a82a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.metadata.BlockletInfoColumnar;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
@@ -583,7 +584,7 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    *
    * @throws CarbonDataWriterException
    */
-  @Override public void writeBlockletInfoToFile() throws CarbonDataWriterException {
+  @Override public void writeFooterToFile() throws CarbonDataWriterException {
     if (this.blockletInfoList.size() > 0) {
       writeBlockletInfoToFile(fileChannel, carbonDataFileTempPath);
     }
@@ -597,7 +598,8 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
    * @throws CarbonDataWriterException
    * @throws CarbonDataWriterException throws new CarbonDataWriterException if any problem
    */
-  public abstract void writeBlockletData(NodeHolder nodeHolder) throws CarbonDataWriterException;
+  public abstract void writeTablePage(EncodedTablePage encodedTablePage)
+      throws CarbonDataWriterException;
 
   /**
    * Below method will be used to update the min or max value
@@ -613,36 +615,6 @@ public abstract class AbstractFactDataWriter<T> implements CarbonFactDataWriter<
   }
 
   /**
-   * Below method will be used to update the no dictionary start and end key
-   *
-   * @param key key to be updated
-   * @return return no dictionary key
-   */
-  protected byte[] updateNoDictionaryStartAndEndKey(byte[] key) {
-    if (key.length == 0) {
-      return key;
-    }
-    // add key to byte buffer remove the length part of the data
-    ByteBuffer buffer = ByteBuffer.wrap(key, 2, key.length - 2);
-    // create a output buffer without length
-    ByteBuffer output = ByteBuffer.allocate(key.length - 2);
-    short numberOfByteToStorLength = 2;
-    // as length part is removed, so each no dictionary value index
-    // needs to be reshuffled by 2 bytes
-    int numberOfNoDictSortColumns =
-        dataWriterVo.getSegmentProperties().getNumberOfNoDictSortColumns();
-    for (int i = 0; i < numberOfNoDictSortColumns; i++) {
-      output.putShort((short) (buffer.getShort() - numberOfByteToStorLength));
-    }
-    // copy the data part
-    while (buffer.hasRemaining()) {
-      output.put(buffer.get());
-    }
-    output.rewind();
-    return output.array();
-  }
-
-  /**
    * This method will copy the carbon data file from local store location to
    * carbon store location
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bc3e6843/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
index 56ee762..f194f74 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/CarbonFactDataWriter.java
@@ -18,35 +18,21 @@
 package org.apache.carbondata.processing.store.writer;
 
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
-import org.apache.carbondata.core.util.NodeHolder;
-import org.apache.carbondata.processing.store.TablePageKey;
-import org.apache.carbondata.processing.store.TablePageStatistics;
+import org.apache.carbondata.core.datastore.page.EncodedTablePage;
 
 public interface CarbonFactDataWriter<T> {
 
   /**
-   * This method will be used to create NodeHolder for a table page
+   * write a encoded table page
    */
-
-  NodeHolder buildDataNodeHolder(EncodedData encoded, TablePageStatistics stats,
-      TablePageKey key) throws CarbonDataWriterException;
-
-  /**
-   * If node holder flag is enabled the object will be added to list
-   * and all the blocklets will be return together. If disabled then this
-   * method will itself will call for writing the fact data
-   *
-   * @param holder
-   */
-  void writeBlockletData(NodeHolder holder) throws CarbonDataWriterException;
+  void writeTablePage(EncodedTablePage encodedTablePage) throws CarbonDataWriterException;
 
   /**
    * Below method will be used to write the leaf meta data to file
    *
    * @throws CarbonDataWriterException
    */
-  void writeBlockletInfoToFile() throws CarbonDataWriterException;
+  void writeFooterToFile() throws CarbonDataWriterException;
 
   /**
    * Below method will be used to initialise the writer