You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:29:08 UTC

[02/56] [abbrv] carbondata git commit: add EncodingStrategy

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
new file mode 100644
index 0000000..a3193c3
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxV2Format.scala
@@ -0,0 +1,708 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+/**
+  * Test Class for data loading with hive syntax and old syntax
+  *
+  */
+class TestLoadDataWithHiveSyntaxV2Format extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+      "V2"
+    )
+    sql("drop table if exists escapechar1")
+    sql("drop table if exists escapechar2")
+    sql("drop table if exists escapechar3")
+    sql("drop table if exists specialcharacter1")
+    sql("drop table if exists specialcharacter2")
+    sql("drop table if exists collessthanschema")
+    sql("drop table if exists decimalarray")
+    sql("drop table if exists decimalstruct")
+    sql("drop table if exists carbontable")
+    sql("drop table if exists hivetable")
+    sql("drop table if exists testtable")
+    sql("drop table if exists testhivetable")
+    sql("drop table if exists testtable1")
+    sql("drop table if exists testhivetable1")
+    sql("drop table if exists complexcarbontable")
+    sql("drop table if exists complex_t3")
+    sql("drop table if exists complex_hive_t3")
+    sql("drop table if exists header_test")
+    sql("drop table if exists duplicateColTest")
+    sql("drop table if exists mixed_header_test")
+    sql("drop table if exists primitivecarbontable")
+    sql("drop table if exists UPPERCASEcube")
+    sql("drop table if exists lowercaseCUBE")
+    sql("drop table if exists carbontable1")
+    sql("drop table if exists hivetable1")
+    sql("drop table if exists comment_test")
+    sql("drop table if exists smallinttable")
+    sql("drop table if exists smallinthivetable")
+    sql(
+      "CREATE table carbontable (empno int, empname String, designation String, doj String, " +
+          "workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
+          "projectcode int, projectjoindate String, projectenddate String, attendance int," +
+          "utilization int,salary int) STORED BY 'org.apache.carbondata.format'"
+    )
+    sql(
+      "create table hivetable(empno int, empname String, designation string, doj String, " +
+          "workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+          "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+          "utilization String,salary String)row format delimited fields terminated by ','"
+    )
+
+  }
+
+  test("create table with smallint type and query smallint table") {
+    sql("drop table if exists smallinttable")
+    sql("drop table if exists smallinthivetable")
+    sql(
+      "create table smallinttable(empno smallint, empname String, designation string, " +
+          "doj String, workgroupcategory int, workgroupcategoryname String,deptno int, " +
+          "deptname String, projectcode int, projectjoindate String,projectenddate String, " +
+          "attendance String, utilization String,salary String)" +
+          "STORED BY 'org.apache.carbondata.format'"
+    )
+
+    sql(
+      "create table smallinthivetable(empno smallint, empname String, designation string, " +
+          "doj String, workgroupcategory int, workgroupcategoryname String,deptno int, " +
+          "deptname String, projectcode int, projectjoindate String,projectenddate String, " +
+          "attendance String, utilization String,salary String)" +
+          "row format delimited fields terminated by ','"
+    )
+
+    sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table smallinttable ")
+    sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite " +
+        "INTO table smallinthivetable")
+
+    checkAnswer(
+      sql("select empno from smallinttable"),
+      sql("select empno from smallinthivetable")
+    )
+
+    sql("drop table if exists smallinttable")
+    sql("drop table if exists smallinthivetable")
+  }
+
+  test("test data loading and validate query output") {
+    sql("drop table if exists testtable")
+    sql("drop table if exists testhivetable")
+    //Create test cube and hive table
+    sql(
+      "CREATE table testtable (empno string, empname String, designation String, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+          "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+          "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+          "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+          "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+    )
+    sql(
+      "create table testhivetable(empno string, empname String, designation string, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+          "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+          "utilization double,salary double)row format delimited fields terminated by ','"
+    )
+    //load data into test cube and hive table and validate query result
+    sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table testtable")
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+          "testhivetable"
+    )
+    checkAnswer(sql("select * from testtable"), sql("select * from testhivetable"))
+    //load data incrementally and validate query result
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE testtable OPTIONS" +
+          "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table testhivetable"
+    )
+    checkAnswer(sql("select * from testtable"), sql("select * from testhivetable"))
+    //drop test cube and table
+    sql("drop table if exists testtable")
+    sql("drop table if exists testhivetable")
+  }
+
+  /**
+    * TODO: temporarily changing cube names to different names,
+    * however deletion and creation of cube with same name
+    */
+  test("test data loading with different case file header and validate query output") {
+    sql("drop table if exists testtable1")
+    sql("drop table if exists testhivetable1")
+    //Create test cube and hive table
+    sql(
+      "CREATE table testtable1 (empno string, empname String, designation String, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+          "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+          "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+          "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+          "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+    )
+    sql(
+      "create table testhivetable1(empno string, empname String, designation string, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+          "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+          "utilization double,salary double)row format delimited fields terminated by ','"
+    )
+    //load data into test cube and hive table and validate query result
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table testtable1 " +
+          "options('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='EMPno, empname,designation,doj," +
+          "workgroupcategory,workgroupcategoryname,   deptno,deptname,projectcode,projectjoindate," +
+          "projectenddate,  attendance,   utilization,SALARY')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+          "testhivetable1"
+    )
+    checkAnswer(sql("select * from testtable1"), sql("select * from testhivetable1"))
+    //drop test cube and table
+    sql("drop table if exists testtable1")
+    sql("drop table if exists testhivetable1")
+  }
+
+  test("test hive table data loading") {
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' overwrite INTO table " +
+          "hivetable"
+    )
+    sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table hivetable")
+  }
+
+  test("test carbon table data loading using old syntax") {
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE carbontable OPTIONS" +
+          "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+  }
+
+  test("test carbon table data loading using new syntax compatible with hive") {
+    sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table carbontable")
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table carbontable options" +
+          "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+    )
+  }
+
+  test("test carbon table data loading using new syntax with overwrite option compatible with hive")
+  {
+    try {
+      sql(s"LOAD DATA local inpath '$resourcesPath/data.csv' overwrite INTO table carbontable")
+    } catch {
+      case e: Throwable => {
+        assert(e.getMessage
+            .equals("Overwrite is not supported for carbon table with default.carbontable")
+        )
+      }
+    }
+  }
+
+  test("complex types data loading") {
+    sql("drop table if exists complexcarbontable")
+    sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+        "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+        "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+        "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+        "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+        "double,contractNumber double) " +
+        "STORED BY 'org.apache.carbondata.format' " +
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/complexdata.csv' INTO table " +
+          "complexcarbontable " +
+          "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+          "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+          "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+    )
+    sql("drop table if exists complexcarbontable")
+  }
+
+  test(
+    "complex types data loading with more unused columns and different order of complex columns " +
+        "in csv and create table"
+  ) {
+    sql("drop table if exists complexcarbontable")
+    sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+        "mobile struct<imei:string, imsi:string>, ROMSize string, purchasedate string," +
+        "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+        "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+        "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+        "double,contractNumber double) " +
+        "STORED BY 'org.apache.carbondata.format' " +
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId','DICTIONARY_EXCLUDE'='channelsId')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/complextypediffentcolheaderorder.csv' INTO " +
+          "table complexcarbontable " +
+          "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+          "ROMSize,purchasedate,MAC,abc,mobile,locationinfo,proddate,gamePointId,contractNumber'," +
+          "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+    )
+    sql("select count(*) from complexcarbontable")
+    sql("drop table if exists complexcarbontable")
+  }
+
+  test("test carbon table data loading with csv file Header in caps") {
+    sql("drop table if exists header_test")
+    sql(
+      "create table header_test(empno int, empname String, designation string, doj String, " +
+          "workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+          "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+          "utilization String,salary String) STORED BY 'org.apache.carbondata.format'"
+    )
+    val csvFilePath = s"$resourcesPath/data_withCAPSHeader.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO table header_test OPTIONS " +
+        "('DELIMITER'=',', 'QUOTECHAR'='\"')");
+    checkAnswer(sql("select empno from header_test"),
+      Seq(Row(11), Row(12))
+    )
+  }
+
+  test("test duplicate column validation") {
+    try {
+      sql("create table duplicateColTest(col1 string, Col1 string)")
+    }
+    catch {
+      case e: Exception => {
+        assert(e.getMessage.contains("Duplicate column name") ||
+            e.getMessage.contains("Found duplicate column"))
+      }
+    }
+  }
+
+  test(
+    "test carbon table data loading with csv file Header in Mixed Case and create table columns " +
+        "in mixed case"
+  ) {
+    sql("drop table if exists mixed_header_test")
+    sql(
+      "create table mixed_header_test(empno int, empname String, Designation string, doj String, " +
+          "Workgroupcategory int, workgroupcategoryname String,deptno int, deptname String, " +
+          "projectcode int, projectjoindate String,projectenddate String, attendance String," +
+          "utilization String,salary String) STORED BY 'org.apache.carbondata.format'"
+    )
+    val csvFilePath = s"$resourcesPath/data_withMixedHeader.csv"
+    sql("LOAD DATA local inpath '" + csvFilePath + "' INTO table mixed_header_test OPTIONS " +
+        "('DELIMITER'=',', 'QUOTECHAR'='\"')");
+    checkAnswer(sql("select empno from mixed_header_test"),
+      Seq(Row(11), Row(12))
+    )
+  }
+
+
+  test("complex types data loading with hive column having more than required column values") {
+    sql("drop table if exists complexcarbontable")
+    sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+        "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+        "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+        "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+        "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+        "double,contractNumber double) " +
+        "STORED BY 'org.apache.carbondata.format' " +
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/complexdatastructextra.csv' INTO table " +
+          "complexcarbontable " +
+          "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+          "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+          "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+    )
+    sql("drop table if exists complexcarbontable")
+  }
+
+  test("complex types & no dictionary columns data loading") {
+    sql("drop table if exists complexcarbontable")
+    sql("create table complexcarbontable(deviceInformationId int, channelsId string," +
+        "ROMSize string, purchasedate string, mobile struct<imei:string, imsi:string>," +
+        "MAC array<string>, locationinfo array<struct<ActiveAreaId:int, ActiveCountry:string, " +
+        "ActiveProvince:string, Activecity:string, ActiveDistrict:string, ActiveStreet:string>>," +
+        "proddate struct<productionDate:string,activeDeactivedate:array<string>>, gamePointId " +
+        "double,contractNumber double) " +
+        "STORED BY 'org.apache.carbondata.format' " +
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId', 'DICTIONARY_EXCLUDE'='ROMSize," +
+        "purchasedate')"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/complexdata.csv' INTO table " +
+          "complexcarbontable " +
+          "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,channelsId," +
+          "ROMSize,purchasedate,mobile,MAC,locationinfo,proddate,gamePointId,contractNumber'," +
+          "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')"
+    );
+    sql("drop table if exists complexcarbontable")
+  }
+
+  test("array<string> and string datatype for same column is not working properly") {
+    sql("drop table if exists complexcarbontable")
+    sql("create table complexcarbontable(deviceInformationId int, MAC array<string>, channelsId string, "+
+        "ROMSize string, purchasedate string, gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' "+
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complexdatareordered.csv' INTO table complexcarbontable "+
+        "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,MAC,channelsId,ROMSize,purchasedate,gamePointId,contractNumber',"+
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+    sql("drop table if exists complexcarbontable")
+    sql("create table primitivecarbontable(deviceInformationId int, MAC string, channelsId string, "+
+        "ROMSize string, purchasedate string, gamePointId double,contractNumber double) STORED BY 'org.apache.carbondata.format' "+
+        "TBLPROPERTIES ('DICTIONARY_INCLUDE'='deviceInformationId')")
+    sql(s"LOAD DATA local inpath '$resourcesPath/complexdatareordered.csv' INTO table primitivecarbontable "+
+        "OPTIONS('DELIMITER'=',', 'QUOTECHAR'='\"', 'FILEHEADER'='deviceInformationId,MAC,channelsId,ROMSize,purchasedate,gamePointId,contractNumber',"+
+        "'COMPLEX_DELIMITER_LEVEL_1'='$', 'COMPLEX_DELIMITER_LEVEL_2'=':')")
+    sql("drop table if exists primitivecarbontable")
+  }
+
+  test(
+    "test carbon table data loading when table name is in different case with create table, for " +
+        "UpperCase"
+  ) {
+    sql("drop table if exists UPPERCASEcube")
+    sql("create table UPPERCASEcube(empno Int, empname String, designation String, " +
+        "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
+        "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +
+        "attendance Int,utilization Double,salary Double) STORED BY 'org.apache.carbondata.format'"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table uppercasecube OPTIONS" +
+          "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+    )
+    sql("drop table if exists UpperCaseCube")
+  }
+
+  test(
+    "test carbon table data loading when table name is in different case with create table ,for " +
+        "LowerCase"
+  ) {
+    sql("drop table if exists lowercaseCUBE")
+    sql("create table lowercaseCUBE(empno Int, empname String, designation String, " +
+        "doj String, workgroupcategory Int, workgroupcategoryname String, deptno Int, " +
+        "deptname String, projectcode Int, projectjoindate String, projectenddate String, " +
+        "attendance Int,utilization Double,salary Double) STORED BY 'org.apache.carbondata.format'"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/data.csv' INTO table LOWERCASECUBE OPTIONS" +
+          "('DELIMITER'=',', 'QUOTECHAR'='\"')"
+    )
+    sql("drop table if exists LowErcasEcube")
+  }
+
+  test("test carbon table data loading using escape char 1") {
+    sql("DROP TABLE IF EXISTS escapechar1")
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS escapechar1
+            (ID Int, date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+    """
+    )
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(
+      s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/datawithbackslash.csv' into table escapechar1
+    OPTIONS('ESCAPECHAR'='@')
+    """
+    )
+    checkAnswer(sql("select count(*) from escapechar1"), Seq(Row(10)))
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql("DROP TABLE IF EXISTS escapechar1")
+  }
+
+  test("test carbon table data loading using escape char 2") {
+    sql("DROP TABLE IF EXISTS escapechar2")
+
+    sql(
+      """
+        CREATE TABLE escapechar2(imei string,specialchar string)
+        STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    sql(
+      s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/datawithescapecharacter.csv' into table escapechar2
+    options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='\')
+        """
+    )
+    checkAnswer(sql("select count(*) from escapechar2"), Seq(Row(21)))
+    checkAnswer(sql("select specialchar from escapechar2 where imei = '1AA44'"), Seq(Row("escapeesc")))
+    sql("DROP TABLE IF EXISTS escapechar2")
+  }
+
+  test("test carbon table data loading using escape char 3") {
+    sql("DROP TABLE IF EXISTS escapechar3")
+
+    sql(
+      """
+        CREATE TABLE escapechar3(imei string,specialchar string)
+        STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    sql(
+      s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/datawithescapecharacter.csv' into table escapechar3
+    options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='@')
+    """
+    )
+    checkAnswer(sql("select count(*) from escapechar3"), Seq(Row(21)))
+    checkAnswer(sql("select specialchar from escapechar3 where imei in ('1232','12323')"), Seq(Row
+    ("ayush@b.com"), Row("ayushb.com")
+    )
+    )
+    sql("DROP TABLE IF EXISTS escapechar3")
+  }
+
+  test("test carbon table data loading with special character 1") {
+    sql("DROP TABLE IF EXISTS specialcharacter1")
+
+    sql(
+      """
+        CREATE TABLE specialcharacter1(imei string,specialchar string)
+        STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    sql(
+      s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/datawithspecialcharacter.csv' into table specialcharacter1
+    options ('DELIMITER'=',', 'QUOTECHAR'='"')
+    """
+    )
+    checkAnswer(sql("select count(*) from specialcharacter1"), Seq(Row(37)))
+    checkAnswer(sql("select specialchar from specialcharacter1 where imei='1AA36'"), Seq(Row("\"i\"")))
+    sql("DROP TABLE IF EXISTS specialcharacter1")
+  }
+
+  test("test carbon table data loading with special character 2") {
+    sql("DROP TABLE IF EXISTS specialcharacter2")
+
+    sql(
+      """
+        CREATE table specialcharacter2(customer_id int, 124_string_level_province String, date_level String,
+        Time_level String, lname String, fname String, mi String, address1 String, address2
+    String, address3 String, address4 String, city String, country String, phone1 String,
+        phone2 String, marital_status String, yearly_income String, gender String, education
+    String, member_card String, occupation String, houseowner String, fullname String,
+        numeric_level double, account_num double, customer_region_id int, total_children int,
+    num_children_at_home int, num_cars_owned int)
+    STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    sql(
+      s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/datawithcomplexspecialchar.csv' into
+    table specialcharacter2 options ('DELIMITER'=',', 'QUOTECHAR'='"','ESCAPECHAR'='"')
+    """
+    )
+    checkAnswer(sql("select count(*) from specialcharacter2"), Seq(Row(150)))
+    checkAnswer(sql("select 124_string_level_province from specialcharacter2 where customer_id=103"),
+      Seq(Row("\"state province # 124\""))
+    )
+    sql("DROP TABLE IF EXISTS specialcharacter2")
+  }
+
+  test("test data which contain column less than schema"){
+    sql("DROP TABLE IF EXISTS collessthanschema")
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS collessthanschema
+            (ID Int, date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int)
+        STORED BY 'org.apache.carbondata.format'
+    """)
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/lessthandatacolumndata.csv' into table collessthanschema
+    """)
+    checkAnswer(sql("select count(*) from collessthanschema"),Seq(Row(10)))
+    sql("DROP TABLE IF EXISTS collessthanschema")
+  }
+
+  test("test data which contain column with decimal data type in array."){
+    sql("DROP TABLE IF EXISTS decimalarray")
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS decimalarray
+            (ID decimal(5,5), date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int, complex
+                array<decimal(4,2)>)
+        STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimal.csv' into table decimalarray
+    """)
+    checkAnswer(sql("select count(*) from decimalarray"),Seq(Row(8)))
+    sql("DROP TABLE IF EXISTS decimalarray")
+  }
+
+  test("test data which contain column with decimal data type in struct."){
+    sql("DROP TABLE IF EXISTS decimalstruct")
+
+    sql(
+      """
+        CREATE TABLE IF NOT EXISTS decimalstruct
+            (ID decimal(5,5), date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int, complex
+                struct<a:decimal(4,2)>)
+    STORED BY 'org.apache.carbondata.format'
+    """
+    )
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimal.csv' into table decimalstruct
+    """)
+    checkAnswer(sql("select count(*) from decimalstruct"),Seq(Row(8)))
+    sql("DROP TABLE IF EXISTS decimalstruct")
+  }
+
+  test("test data which contain column with decimal data type in array of struct."){
+    sql("DROP TABLE IF EXISTS complex_t3")
+    sql("DROP TABLE IF EXISTS complex_hive_t3")
+
+    sql(
+      """
+        CREATE TABLE complex_t3
+            (ID decimal, date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int, complex
+                array<struct<a:decimal(4,2),str:string>>)
+    STORED BY 'org.apache.carbondata.format'
+    """
+    )
+    sql(
+      """
+        CREATE TABLE complex_hive_t3
+            (ID decimal, date Timestamp, country String,
+                name String, phonetype String, serialname String, salary Int, complex
+                array<struct<a:decimal(4,2),str:string>>)
+    row format delimited fields terminated by ','
+    """
+    )
+
+    CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
+    sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimalNested.csv' into table complex_t3
+    """)
+    sql(s"""
+        LOAD DATA LOCAL INPATH '$resourcesPath/complexTypeDecimalNestedHive.csv' into table complex_hive_t3
+    """)
+    checkAnswer(sql("select count(*) from complex_t3"),sql("select count(*) from complex_hive_t3"))
+    checkAnswer(sql("select id from complex_t3 where salary = 15000"),sql("select id from complex_hive_t3 where salary = 15000"))
+  }
+
+  test("test data loading when delimiter is '|' and data with header") {
+    sql(
+      "CREATE table carbontable1 (empno string, empname String, designation String, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String, deptno string, deptname String, " +
+          "projectcode string, projectjoindate String, projectenddate String,attendance double," +
+          "utilization double,salary double) STORED BY 'org.apache.carbondata.format' TBLPROPERTIES" +
+          "('DICTIONARY_EXCLUDE'='empno,empname,designation,doj,workgroupcategory," +
+          "workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate')"
+    )
+    sql(
+      "create table hivetable1 (empno string, empname String, designation string, doj String, " +
+          "workgroupcategory string, workgroupcategoryname String,deptno string, deptname String, " +
+          "projectcode string, projectjoindate String,projectenddate String, attendance double," +
+          "utilization double,salary double)row format delimited fields terminated by ','"
+    )
+
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/datadelimiter.csv' INTO TABLE carbontable1 OPTIONS" +
+          "('DELIMITER'= '|', 'QUOTECHAR'= '\"')"
+    )
+
+    sql(s"LOAD DATA local inpath '$resourcesPath/datawithoutheader.csv' INTO table hivetable1")
+
+    checkAnswer(sql("select * from carbontable1"), sql("select * from hivetable1"))
+  }
+
+  test("test data loading with comment option") {
+    sql("drop table if exists comment_test")
+    sql(
+      "create table comment_test(imei string, age int, task bigint, num double, level decimal(10," +
+          "3), productdate timestamp, mark int, name string) STORED BY 'org.apache.carbondata.format'"
+    )
+    sql(
+      s"LOAD DATA local inpath '$resourcesPath/comment.csv' INTO TABLE comment_test " +
+          "options('DELIMITER' = ',', 'QUOTECHAR' = '.', 'COMMENTCHAR' = '?','FILEHEADER'='imei,age,task,num,level,productdate,mark,name', 'maxcolumns'='180')"
+    )
+    checkAnswer(sql("select imei from comment_test"),Seq(Row("\".carbon"),Row("#?carbon"), Row(""),
+      Row("~carbon,")))
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists escapechar1")
+    sql("drop table if exists escapechar2")
+    sql("drop table if exists escapechar3")
+    sql("drop table if exists specialcharacter1")
+    sql("drop table if exists specialcharacter2")
+    sql("drop table if exists collessthanschema")
+    sql("drop table if exists decimalarray")
+    sql("drop table if exists decimalstruct")
+    sql("drop table if exists carbontable")
+    sql("drop table if exists hivetable")
+    sql("drop table if exists testtable")
+    sql("drop table if exists testhivetable")
+    sql("drop table if exists testtable1")
+    sql("drop table if exists testhivetable1")
+    sql("drop table if exists complexcarbontable")
+    sql("drop table if exists complex_t3")
+    sql("drop table if exists complex_hive_t3")
+    sql("drop table if exists header_test")
+    sql("drop table if exists duplicateColTest")
+    sql("drop table if exists mixed_header_test")
+    sql("drop table if exists primitivecarbontable")
+    sql("drop table if exists UPPERCASEcube")
+    sql("drop table if exists lowercaseCUBE")
+    sql("drop table if exists carbontable1")
+    sql("drop table if exists hivetable1")
+    sql("drop table if exists comment_test")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
+      CarbonCommonConstants.CARBON_DATA_FILE_DEFAULT_VERSION
+    )
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index 7a6c513..4879e49 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -174,7 +174,7 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
    * Steps for changing the plan.
    * 1. It finds out the join condition columns and dimension aggregate columns which are need to
    * be decoded just before that plan executes.
-   * 2. Plan starts transform by adding the decoder to the plan where it needs the decoded data
+   * 2. Plan starts encode by adding the decoder to the plan where it needs the decoded data
    * like dimension aggregate columns decoder under aggregator and join condition decoder under
    * join children.
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 8c345cb..ccd5afd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -208,7 +209,7 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
 
       try {
         sortDataRow.initialize();
-      } catch (CarbonSortKeyAndGroupByException e) {
+      } catch (MemoryException e) {
         throw new CarbonDataLoadingException(e);
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index e2623bd..3d494de 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
@@ -88,7 +89,7 @@ public class UnsafeParallelReadMergeSorterImpl extends AbstractMergeSorter {
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       sortDataRow.initialize();
-    } catch (CarbonSortKeyAndGroupByException e) {
+    } catch (MemoryException e) {
       throw new CarbonDataLoadingException(e);
     }
     ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index 0716796..aed2bc1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.memory.MemoryException;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
@@ -41,7 +42,6 @@ import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeSortDataRows;
 import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeIntermediateMerger;
 import org.apache.carbondata.processing.newflow.sort.unsafe.merger.UnsafeSingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
@@ -91,7 +91,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl implements Sorter {
             new UnsafeSortDataRows(parameters, intermediateFileMergers[i], inMemoryChunkSizeInMB);
         sortDataRows[i].initialize();
       }
-    } catch (CarbonSortKeyAndGroupByException e) {
+    } catch (MemoryException e) {
       throw new CarbonDataLoadingException(e);
     }
     ExecutorService executorService = Executors.newFixedThreadPool(iterators.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
deleted file mode 100644
index cc59495..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/IntPointerBuffer.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-
-/**
- * Holds the pointers for rows.
- */
-public class IntPointerBuffer {
-
-  private int length;
-
-  private int actualSize;
-
-  private int[] pointerBlock;
-
-  private MemoryBlock baseBlock;
-
-  private MemoryBlock pointerMemoryBlock;
-
-  public IntPointerBuffer(MemoryBlock baseBlock) {
-    // TODO can be configurable, it is initial size and it can grow automatically.
-    this.length = 100000;
-    pointerBlock = new int[length];
-    this.baseBlock = baseBlock;
-  }
-
-  public IntPointerBuffer(int length) {
-    this.length = length;
-    pointerBlock = new int[length];
-  }
-
-  public void set(int index, int value) {
-    pointerBlock[index] = value;
-  }
-
-  public void set(int value) {
-    ensureMemory();
-    pointerBlock[actualSize] = value;
-    actualSize++;
-  }
-
-  /**
-   * Returns the value at position {@code index}.
-   */
-  public int get(int index) {
-    assert index >= 0 : "index (" + index + ") should >= 0";
-    assert index < length : "index (" + index + ") should < length (" + length + ")";
-    if (pointerBlock == null) {
-      return CarbonUnsafe.unsafe.getInt(pointerMemoryBlock.getBaseObject(),
-          pointerMemoryBlock.getBaseOffset() + (index * 4));
-    }
-    return pointerBlock[index];
-  }
-
-  public void loadToUnsafe() throws CarbonSortKeyAndGroupByException {
-    pointerMemoryBlock = UnsafeSortDataRows.getMemoryBlock(pointerBlock.length * 4);
-    for (int i = 0; i < pointerBlock.length; i++) {
-      CarbonUnsafe.unsafe
-          .putInt(pointerMemoryBlock.getBaseObject(), pointerMemoryBlock.getBaseOffset() + i * 4,
-              pointerBlock[i]);
-    }
-    pointerBlock = null;
-  }
-
-  public int getActualSize() {
-    return actualSize;
-  }
-
-  public MemoryBlock getBaseBlock() {
-    return baseBlock;
-  }
-
-  public int[] getPointerBlock() {
-    return pointerBlock;
-  }
-
-  private void ensureMemory() {
-    if (actualSize >= length) {
-      // Expand by quarter, may be we can correct the logic later
-      int localLength = length + (int) (length * (0.25));
-      int[] memoryAddress = new int[localLength];
-      System.arraycopy(pointerBlock, 0, memoryAddress, 0, length);
-      pointerBlock = memoryAddress;
-      length = localLength;
-    }
-  }
-
-  public void freeMemory() {
-    pointerBlock = null;
-    if (pointerMemoryBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(pointerMemoryBlock);
-    }
-    if (baseBlock != null) {
-      UnsafeMemoryManager.INSTANCE.freeMemory(baseBlock);
-    }
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
index f2877ae..2ac138b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java
@@ -23,6 +23,7 @@ import java.math.BigDecimal;
 import java.util.Arrays;
 
 import org.apache.carbondata.core.memory.CarbonUnsafe;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.DataTypeUtil;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
deleted file mode 100644
index af49978..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.newflow.sort.unsafe;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.memory.MemoryAllocator;
-import org.apache.carbondata.core.memory.MemoryBlock;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-/**
- * Manages memory for instance.
- */
-public class UnsafeMemoryManager {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(UnsafeMemoryManager.class.getName());
-
-  static {
-    long size;
-    try {
-      size = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB,
-              CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT));
-    } catch (Exception e) {
-      size = Long.parseLong(CarbonCommonConstants.IN_MEMORY_FOR_SORT_DATA_IN_MB_DEFAULT);
-      LOGGER.info("Wrong memory size given, "
-          + "so setting default value to " + size);
-    }
-    if (size < 1024) {
-      size = 1024;
-      LOGGER.info("It is not recommended to keep unsafe memory size less than 1024MB, "
-          + "so setting default value to " + size);
-    }
-
-
-    boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
-            CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT));
-    long takenSize = size * 1024 * 1024;
-    MemoryAllocator allocator;
-    if (offHeap) {
-      allocator = MemoryAllocator.UNSAFE;
-    } else {
-      long maxMemory = Runtime.getRuntime().maxMemory() * 60 / 100;
-      if (takenSize > maxMemory) {
-        takenSize = maxMemory;
-      }
-      allocator = MemoryAllocator.HEAP;
-    }
-    INSTANCE = new UnsafeMemoryManager(takenSize, allocator);
-  }
-
-  public static final UnsafeMemoryManager INSTANCE;
-
-  private long totalMemory;
-
-  private long memoryUsed;
-
-  private MemoryAllocator allocator;
-
-  private long minimumMemory;
-
-  private UnsafeMemoryManager(long totalMemory, MemoryAllocator allocator) {
-    this.totalMemory = totalMemory;
-    this.allocator = allocator;
-    long numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    long sortMemoryChunkSize = CarbonProperties.getInstance().getSortMemoryChunkSizeInMB();
-    sortMemoryChunkSize = sortMemoryChunkSize * 1024 * 1024;
-    long totalWorkingMemoryForAllThreads = sortMemoryChunkSize * numberOfCores;
-    if (totalWorkingMemoryForAllThreads >= totalMemory) {
-      throw new RuntimeException("Working memory should be less than total memory configured, "
-          + "so either reduce the loading threads or increase the memory size. "
-          + "(Number of threads * number of threads) should be less than total unsafe memory");
-    }
-    minimumMemory = totalWorkingMemoryForAllThreads;
-    LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator
-        + " and minimum reserve memory " + minimumMemory);
-  }
-  public synchronized MemoryBlock allocateMemory(long memoryRequested) {
-    if (memoryUsed + memoryRequested <= totalMemory) {
-      MemoryBlock allocate = allocator.allocate(memoryRequested);
-      memoryUsed += allocate.size();
-      LOGGER.info("Memory block is created with size "  + allocate.size() +
-          " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
-      return allocate;
-    }
-    return null;
-  }
-
-  public synchronized void freeMemory(MemoryBlock memoryBlock) {
-    allocator.free(memoryBlock);
-    memoryUsed -= memoryBlock.size();
-    memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-    LOGGER.info(
-        "Memory released, memory used " + memoryUsed + " memory left " + (getAvailableMemory()));
-  }
-
-  public synchronized long getAvailableMemory() {
-    return totalMemory - memoryUsed;
-  }
-
-  public boolean isMemoryAvailable() {
-    return getAvailableMemory() > minimumMemory;
-  }
-
-  public long getUsableMemory() {
-    return totalMemory - minimumMemory;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
index 074bb3b..a42d0ea 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeSortDataRows.java
@@ -31,7 +31,10 @@ import java.util.concurrent.TimeUnit;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.processing.newflow.sort.unsafe.comparator.UnsafeRowComparator;
@@ -111,8 +114,8 @@ public class UnsafeSortDataRows {
   /**
    * This method will be used to initialize
    */
-  public void initialize() throws CarbonSortKeyAndGroupByException {
-    MemoryBlock baseBlock = getMemoryBlock(inMemoryChunkSize);
+  public void initialize() throws MemoryException {
+    MemoryBlock baseBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
     this.rowPage = new UnsafeCarbonRowPage(parameters.getNoDictionaryDimnesionColumn(),
         parameters.getNoDictionarySortColumn(),
         parameters.getDimColCount() + parameters.getComplexDimColCount(),
@@ -130,28 +133,6 @@ public class UnsafeSortDataRows {
     semaphore = new Semaphore(parameters.getNumberOfCores());
   }
 
-  public static MemoryBlock getMemoryBlock(long size) throws CarbonSortKeyAndGroupByException {
-    MemoryBlock baseBlock = null;
-    int tries = 0;
-    while (tries < 100) {
-      baseBlock = UnsafeMemoryManager.INSTANCE.allocateMemory(size);
-      if (baseBlock == null) {
-        try {
-          Thread.sleep(50);
-        } catch (InterruptedException e) {
-          throw new CarbonSortKeyAndGroupByException(e);
-        }
-      } else {
-        break;
-      }
-      tries++;
-    }
-    if (baseBlock == null) {
-      throw new CarbonSortKeyAndGroupByException("Not enough memory to create page");
-    }
-    return baseBlock;
-  }
-
   public boolean canAdd() {
     return bytesAdded < maxSizeAllowed;
   }
@@ -196,7 +177,7 @@ public class UnsafeSortDataRows {
           unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
           semaphore.acquire();
           dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-          MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+          MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
           boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
           rowPage = new UnsafeCarbonRowPage(
                   parameters.getNoDictionaryDimnesionColumn(),
@@ -233,7 +214,7 @@ public class UnsafeSortDataRows {
         unsafeInMemoryIntermediateFileMerger.startFileMergingIfPossible();
         semaphore.acquire();
         dataSorterAndWriterExecutorService.submit(new DataSorterAndWriter(rowPage));
-        MemoryBlock memoryBlock = getMemoryBlock(inMemoryChunkSize);
+        MemoryBlock memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry(inMemoryChunkSize);
         boolean saveToDisk = !UnsafeMemoryManager.INSTANCE.isMemoryAvailable();
         rowPage = new UnsafeCarbonRowPage(
             parameters.getNoDictionaryDimnesionColumn(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
index b63bdfc..e7fec26 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/sort/UnsafeIntSortDataFormat.java
@@ -16,7 +16,7 @@
  */
 package org.apache.carbondata.processing.newflow.sort.unsafe.sort;
 
-import org.apache.carbondata.processing.newflow.sort.unsafe.IntPointerBuffer;
+import org.apache.carbondata.core.memory.IntPointerBuffer;
 import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage;
 import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeCarbonRow;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 91c2bc4..be86808 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -37,6 +37,7 @@ import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants;
 import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.columnar.ColumnGroupModel;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.columnar.ColumnarSplitter;
@@ -53,7 +54,6 @@ import org.apache.carbondata.processing.store.file.FileManager;
 import org.apache.carbondata.processing.store.file.IFileManagerComposite;
 import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo;
 import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter;
-import org.apache.carbondata.processing.store.writer.Encoder;
 
 /**
  * Fact data handler class to handle the fact data
@@ -144,7 +144,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    */
   private ColumnarFormatVersion version;
 
-  private DefaultEncoder encoder;
+  private TablePageEncoder encoder;
 
   private SortScopeOptions.SortScope sortScope;
 
@@ -206,7 +206,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       rleEncodingForDictDimension = arrangeUniqueBlockType(rleEncodingForDictDimension);
     }
     this.version = CarbonProperties.getInstance().getFormatVersion();
-    this.encoder = new DefaultEncoder(model);
+    this.encoder = new TablePageEncoder(model);
   }
 
   private void initParameters(CarbonFactDataHandlerModel model) {
@@ -358,8 +358,8 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
       rowId++;
     }
 
-    // encode and compress dimensions and measure
-    Encoder.EncodedData encodedData = encoder.encode(tablePage);
+    // apply and compress dimensions and measure
+    EncodedData encodedData = encoder.encode(tablePage);
 
     TablePageStatistics tablePageStatistics = new TablePageStatistics(
         model.getTableSpec(), tablePage, encodedData, tablePage.getMeasureStats());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
deleted file mode 100644
index 73c4fa1..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/store/DefaultEncoder.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.store;
-
-import java.nio.ByteBuffer;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.compression.ValueCompressor;
-import org.apache.carbondata.core.datastore.TableSpec;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
-import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
-import org.apache.carbondata.core.datastore.columnar.ColGroupBlockStorage;
-import org.apache.carbondata.core.datastore.columnar.IndexStorage;
-import org.apache.carbondata.core.datastore.compression.Compressor;
-import org.apache.carbondata.core.datastore.compression.CompressorFactory;
-import org.apache.carbondata.core.datastore.compression.ValueCompressionHolder;
-import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.CompressionFinder;
-import org.apache.carbondata.core.util.ValueCompressionUtil;
-import org.apache.carbondata.processing.store.writer.Encoder;
-
-// Default encoder for encoding dimension and measures. For dimensions, it applies RLE and
-// inverted index encoding. For measures, it applies delta encoding or adaptive encoding
-public class DefaultEncoder implements Encoder {
-
-  private ColumnarFormatVersion version;
-
-  private boolean[] isUseInvertedIndex;
-
-  private CarbonFactDataHandlerModel model;
-
-  public DefaultEncoder(CarbonFactDataHandlerModel model) {
-    this.version = CarbonProperties.getInstance().getFormatVersion();
-    this.model = model;
-    this.isUseInvertedIndex = model.getIsUseInvertedIndex();
-  }
-
-  // function to encode all columns in one table page
-  public Encoder.EncodedData encode(TablePage tablePage) {
-    Encoder.EncodedData encodedData = new Encoder.EncodedData();
-    encodeAndCompressDimensions(tablePage, encodedData);
-    encodeAndCompressMeasures(tablePage, encodedData);
-    return encodedData;
-  }
-
-  // encode measure and set encodedData in `encodedData`
-  private void encodeAndCompressMeasures(TablePage tablePage, Encoder.EncodedData encodedData) {
-    // TODO: following conversion is required only because compress model requires them,
-    // remove then after the compress framework is refactoried
-    ColumnPage[] measurePage = tablePage.getMeasurePage();
-    int measureCount = measurePage.length;
-    byte[] dataTypeSelected = new byte[measureCount];
-    CompressionFinder[] finders = new CompressionFinder[measureCount];
-    for (int i = 0; i < measureCount; i++) {
-      ColumnPageStatistics stats = measurePage[i].getStatistics();
-      finders[i] = ValueCompressionUtil.getCompressionFinder(
-          stats.getMax(),
-          stats.getMin(),
-          stats.getDecimal(),
-          measurePage[i].getDataType(), dataTypeSelected[i]);
-    }
-
-    //CompressionFinder[] finders = compressionModel.getCompressionFinders();
-    ValueCompressionHolder[] holders = ValueCompressionUtil.getValueCompressionHolder(finders);
-    encodedData.measures = encodeMeasure(holders, finders, measurePage);
-  }
-
-  // this method first invokes encoding routine to encode the data chunk,
-  // followed by invoking compression routine for preparing the data chunk for writing.
-  private byte[][] encodeMeasure(ValueCompressionHolder[] holders,
-      CompressionFinder[] finders,
-      ColumnPage[] columnPages) {
-    ValueCompressionHolder[] values = new ValueCompressionHolder[columnPages.length];
-    byte[][] encodedMeasures = new byte[values.length][];
-    for (int i = 0; i < columnPages.length; i++) {
-      values[i] = holders[i];
-      if (columnPages[i].getDataType() != DataType.DECIMAL) {
-        ValueCompressor compressor =
-            ValueCompressionUtil.getValueCompressor(finders[i]);
-        Object compressed = compressor.getCompressedValues(
-            finders[i],
-            columnPages[i],
-            columnPages[i].getStatistics().getMax(),
-            columnPages[i].getStatistics().getDecimal());
-        values[i].setValue(compressed);
-      } else {
-        // in case of decimal, 'flatten' the byte[][] to byte[]
-        byte[][] decimalPage = columnPages[i].getDecimalPage();
-        int totalSize = 0;
-        for (byte[] decimal : decimalPage) {
-          totalSize += decimal.length;
-        }
-        ByteBuffer temp = ByteBuffer.allocate(totalSize);
-        for (byte[] decimal : decimalPage) {
-          temp.put(decimal);
-        }
-        values[i].setValue(temp.array());
-      }
-      values[i].compress();
-      encodedMeasures[i] = values[i].getCompressedData();
-    }
-
-    return encodedMeasures;
-  }
-
-  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex) {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, true, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, true, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndex(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex) {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, false, false, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, false, false, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndex(data);
-      }
-    }
-  }
-
-  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
-    if (version == ColumnarFormatVersion.V3) {
-      return new BlockIndexerStorageForShort(data, false, false, false);
-    } else {
-      return new BlockIndexerStorageForInt(data, false, false, false);
-    }
-  }
-
-  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
-      boolean isUseInvertedIndex) {
-    if (isUseInvertedIndex) {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForShort(data, false, true, isSort);
-      } else {
-        return new BlockIndexerStorageForInt(data, false, true, isSort);
-      }
-    } else {
-      if (version == ColumnarFormatVersion.V3) {
-        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
-      } else {
-        return new BlockIndexerStorageForNoInvertedIndex(data);
-      }
-    }
-  }
-
-  // encode and compress each dimension, set encoded data in `encodedData`
-  private void encodeAndCompressDimensions(TablePage tablePage, Encoder.EncodedData encodedData) {
-    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
-    int dictionaryColumnCount = -1;
-    int noDictionaryColumnCount = -1;
-    int colGrpId = -1;
-    int indexStorageOffset = 0;
-    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
-    SegmentProperties segmentProperties = model.getSegmentProperties();
-    Compressor compressor = CompressorFactory.getInstance().getCompressor();
-    byte[][] compressedColumns = new byte[indexStorages.length][];
-    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
-      byte[] flattened;
-      boolean isSortColumn = model.isSortColumn(i);
-      switch (dimensionSpec.getType(i)) {
-        case GLOBAL_DICTIONARY:
-          // dictionary dimension
-          indexStorages[indexStorageOffset] =
-              encodeAndCompressDictDimension(
-                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
-                  isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn);
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
-          break;
-        case DIRECT_DICTIONARY:
-          // timestamp and date column
-          indexStorages[indexStorageOffset] =
-              encodeAndCompressDirectDictDimension(
-                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount),
-                  isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn);
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
-          break;
-        case PLAIN_VALUE:
-          // high cardinality dimension, encoded as plain string
-          indexStorages[indexStorageOffset] =
-              encodeAndCompressNoDictDimension(
-                  tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getStringPage(),
-                  isSortColumn,
-                  isUseInvertedIndex[i] & isSortColumn);
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
-          break;
-        case COLUMN_GROUP:
-          // column group
-          indexStorages[indexStorageOffset] =
-              new ColGroupBlockStorage(
-                  segmentProperties,
-                  ++colGrpId,
-                  tablePage.getKeyColumnPage().getKeyVector(++dictionaryColumnCount));
-          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
-          break;
-        case COMPLEX:
-          // we need to add complex column at last, so skipping it here
-          continue;
-        default:
-          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
-      }
-      compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
-      indexStorageOffset++;
-    }
-
-    // handle complex type column
-    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
-      Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
-      while (iterator.hasNext()) {
-        byte[][] data = iterator.next();
-        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
-        byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
-        compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
-        indexStorageOffset++;
-      }
-    }
-
-    encodedData.indexStorages = indexStorages;
-    encodedData.dimensions = compressedColumns;
-  }
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index fb7ebfb..65504cd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -29,7 +29,6 @@ import org.apache.carbondata.core.datastore.GenericDataType;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.ComplexColumnPage;
-import org.apache.carbondata.core.datastore.page.KeyColumnPage;
 import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -50,7 +49,7 @@ public class TablePage {
 
   // TODO: we should have separate class for key columns so that keys are stored together in
   // one vector to make it efficient for sorting
-  private KeyColumnPage keyColumnPage;
+  private ColumnPage[] dictDimensionPage;
   private ColumnPage[] noDictDimensionPage;
   private ComplexColumnPage[] complexDimensionPage;
   private ColumnPage[] measurePage;
@@ -62,14 +61,17 @@ public class TablePage {
 
   private CarbonFactDataHandlerModel model;
 
-  TablePage(CarbonFactDataHandlerModel model, int pageSize) {
+  public TablePage(CarbonFactDataHandlerModel model, int pageSize) {
     this.model = model;
     this.pageSize = pageSize;
-    keyColumnPage = new KeyColumnPage(pageSize,
-        model.getSegmentProperties().getDimensionPartitions().length);
+    int numDictDimension = model.getMDKeyGenerator().getDimCount();
+    dictDimensionPage = new ColumnPage[numDictDimension];
+    for (int i = 0; i < dictDimensionPage.length; i++) {
+      dictDimensionPage[i] = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
+    }
     noDictDimensionPage = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPage.length; i++) {
-      noDictDimensionPage[i] = new ColumnPage(DataType.STRING, pageSize);
+      noDictDimensionPage[i] = ColumnPage.newPage(DataType.BYTE_ARRAY, pageSize);
     }
     complexDimensionPage = new ComplexColumnPage[model.getComplexColumnCount()];
     for (int i = 0; i < complexDimensionPage.length; i++) {
@@ -80,7 +82,7 @@ public class TablePage {
     measurePage = new ColumnPage[model.getMeasureCount()];
     DataType[] dataTypes = model.getMeasureDataType();
     for (int i = 0; i < measurePage.length; i++) {
-      measurePage[i] = new ColumnPage(dataTypes[i], pageSize);
+      measurePage[i] = ColumnPage.newPage(dataTypes[i], pageSize);
     }
   }
 
@@ -90,13 +92,15 @@ public class TablePage {
    * @param rowId Id of the input row
    * @param row   row object
    */
-  void addRow(int rowId, CarbonRow row) throws KeyGenException {
+  public void addRow(int rowId, CarbonRow row) throws KeyGenException {
     // convert each column category
 
     // 1. convert dictionary columns
     byte[] mdk = WriteStepRowUtil.getMdk(row, model.getMDKeyGenerator());
     byte[][] keys = model.getSegmentProperties().getFixedLengthKeySplitter().splitKey(mdk);
-    keyColumnPage.putKey(rowId, keys);
+    for (int i = 0; i < dictDimensionPage.length; i++) {
+      dictDimensionPage[i].putData(rowId, keys[i]);
+    }
 
     // 2. convert noDictionary columns and complex columns.
     int noDictionaryCount = noDictDimensionPage.length;
@@ -165,7 +169,7 @@ public class TablePage {
       encodedComplexColumnar.add(new ArrayList<byte[]>());
     }
 
-    // encode the complex type data and fill columnsArray
+    // apply the complex type data and fill columnsArray
     try {
       ByteBuffer byteArrayInput = ByteBuffer.wrap(complexColumns);
       ByteArrayOutputStream byteArrayOutput = new ByteArrayOutputStream();
@@ -195,8 +199,8 @@ public class TablePage {
     return output;
   }
 
-  public KeyColumnPage getKeyColumnPage() {
-    return keyColumnPage;
+  public ColumnPage[] getDictDimensionPage() {
+    return dictDimensionPage;
   }
 
   public ColumnPage[] getNoDictDimensionPage() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
new file mode 100644
index 0000000..5d460a9
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageEncoder.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.store;
+
+import java.util.Iterator;
+
+import org.apache.carbondata.core.datastore.TableSpec;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForInt;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndex;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForNoInvertedIndexForShort;
+import org.apache.carbondata.core.datastore.columnar.BlockIndexerStorageForShort;
+import org.apache.carbondata.core.datastore.columnar.IndexStorage;
+import org.apache.carbondata.core.datastore.compression.Compressor;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.datastore.page.ColumnPage;
+import org.apache.carbondata.core.datastore.page.encoding.ColumnPageCodec;
+import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingStrategy;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.encoding.EncodingStrategy;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+public class TablePageEncoder {
+
+  private ColumnarFormatVersion version;
+
+  private boolean[] isUseInvertedIndex;
+
+  private CarbonFactDataHandlerModel model;
+
+  private static final EncodingStrategy encodingStrategy = new DefaultEncodingStrategy();
+
+  public TablePageEncoder(CarbonFactDataHandlerModel model) {
+    this.version = CarbonProperties.getInstance().getFormatVersion();
+    this.model = model;
+    this.isUseInvertedIndex = model.getIsUseInvertedIndex();
+  }
+
+  // function to apply all columns in one table page
+  public EncodedData encode(TablePage tablePage) throws KeyGenException {
+    EncodedData encodedData = new EncodedData();
+    encodeAndCompressDimensions(tablePage, encodedData);
+    encodeAndCompressMeasures(tablePage, encodedData);
+    return encodedData;
+  }
+
+  // apply measure and set encodedData in `encodedData`
+  private void encodeAndCompressMeasures(TablePage tablePage, EncodedData encodedData) {
+    ColumnPage[] measurePage = tablePage.getMeasurePage();
+    byte[][] encodedMeasures = new byte[measurePage.length][];
+    for (int i = 0; i < measurePage.length; i++) {
+      ColumnPageCodec encoder = encodingStrategy.createCodec(measurePage[i].getStatistics());
+      encodedMeasures[i] = encoder.encode(measurePage[i]);
+    }
+    encodedData.measures = encodedMeasures;
+  }
+
+  private IndexStorage encodeAndCompressDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) throws KeyGenException {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, true, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, true, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  private IndexStorage encodeAndCompressDirectDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) throws KeyGenException {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, false, false, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, false, false, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, false);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  private IndexStorage encodeAndCompressComplexDimension(byte[][] data) {
+    if (version == ColumnarFormatVersion.V3) {
+      return new BlockIndexerStorageForShort(data, false, false, false);
+    } else {
+      return new BlockIndexerStorageForInt(data, false, false, false);
+    }
+  }
+
+  private IndexStorage encodeAndCompressNoDictDimension(byte[][] data, boolean isSort,
+      boolean isUseInvertedIndex) {
+    if (isUseInvertedIndex) {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForShort(data, false, true, isSort);
+      } else {
+        return new BlockIndexerStorageForInt(data, false, true, isSort);
+      }
+    } else {
+      if (version == ColumnarFormatVersion.V3) {
+        return new BlockIndexerStorageForNoInvertedIndexForShort(data, true);
+      } else {
+        return new BlockIndexerStorageForNoInvertedIndex(data);
+      }
+    }
+  }
+
+  // apply and compress each dimension, set encoded data in `encodedData`
+  private void encodeAndCompressDimensions(TablePage tablePage, EncodedData encodedData)
+      throws KeyGenException {
+    TableSpec.DimensionSpec dimensionSpec = model.getTableSpec().getDimensionSpec();
+    int dictionaryColumnCount = -1;
+    int noDictionaryColumnCount = -1;
+    int indexStorageOffset = 0;
+    IndexStorage[] indexStorages = new IndexStorage[dimensionSpec.getNumExpandedDimensions()];
+    Compressor compressor = CompressorFactory.getInstance().getCompressor();
+    byte[][] compressedColumns = new byte[indexStorages.length][];
+    for (int i = 0; i < dimensionSpec.getNumSimpleDimensions(); i++) {
+      byte[] flattened;
+      boolean isSortColumn = model.isSortColumn(i);
+      switch (dimensionSpec.getType(i)) {
+        case GLOBAL_DICTIONARY:
+          // dictionary dimension
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressDictDimension(
+                  tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case DIRECT_DICTIONARY:
+          // timestamp and date column
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressDirectDictDimension(
+                  tablePage.getDictDimensionPage()[++dictionaryColumnCount].getByteArrayPage(),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case PLAIN_VALUE:
+          // high cardinality dimension, encoded as plain string
+          indexStorages[indexStorageOffset] =
+              encodeAndCompressNoDictDimension(
+                  tablePage.getNoDictDimensionPage()[++noDictionaryColumnCount].getByteArrayPage(),
+                  isSortColumn,
+                  isUseInvertedIndex[i] & isSortColumn);
+          flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+          break;
+        case COMPLEX:
+          // we need to add complex column at last, so skipping it here
+          continue;
+        default:
+          throw new RuntimeException("unsupported dimension type: " + dimensionSpec.getType(i));
+      }
+      compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+      indexStorageOffset++;
+    }
+
+    // handle complex type column
+    for (int i = 0; i < dimensionSpec.getNumComplexDimensions(); i++) {
+      Iterator<byte[][]> iterator = tablePage.getComplexDimensionPage()[i].iterator();
+      while (iterator.hasNext()) {
+        byte[][] data = iterator.next();
+        indexStorages[indexStorageOffset] = encodeAndCompressComplexDimension(data);
+        byte[] flattened = ByteUtil.flatten(indexStorages[indexStorageOffset].getKeyBlock());
+        compressedColumns[indexStorageOffset] = compressor.compressByte(flattened);
+        indexStorageOffset++;
+      }
+    }
+
+    encodedData.indexStorages = indexStorages;
+    encodedData.dimensions = compressedColumns;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/edda2483/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
index 2911936..13eaac9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePageStatistics.java
@@ -23,9 +23,9 @@ import java.util.BitSet;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.datastore.columnar.IndexStorage;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
-import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatistics;
+import org.apache.carbondata.core.datastore.page.encoding.EncodedData;
+import org.apache.carbondata.core.datastore.page.statistics.ColumnPageStatsVO;
 import org.apache.carbondata.core.datastore.page.statistics.MeasurePageStatsVO;
-import org.apache.carbondata.processing.store.writer.Encoder;
 
 // Statistics of dimension and measure column in a TablePage
 public class TablePageStatistics {
@@ -55,7 +55,7 @@ public class TablePageStatistics {
   private TableSpec tableSpec;
 
   TablePageStatistics(TableSpec tableSpec, TablePage tablePage,
-      Encoder.EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
+      EncodedData encodedData, MeasurePageStatsVO measurePageStatistics) {
     this.numDimensionsExpanded = tableSpec.getDimensionSpec().getNumExpandedDimensions();
     int numMeasures = tableSpec.getMeasureSpec().getNumMeasures();
     this.dimensionMinValue = new byte[numDimensionsExpanded][];
@@ -69,7 +69,7 @@ public class TablePageStatistics {
     updateNullBitSet(tablePage);
   }
 
-  private void updateMinMax(TablePage tablePage, Encoder.EncodedData encodedData) {
+  private void updateMinMax(TablePage tablePage, EncodedData encodedData) {
     IndexStorage[] keyStorageArray = encodedData.indexStorages;
     byte[][] measureArray = encodedData.measures;
 
@@ -89,7 +89,7 @@ public class TablePageStatistics {
       }
     }
     for (int i = 0; i < measureArray.length; i++) {
-      ColumnPageStatistics stats = tablePage.getMeasurePage()[i].getStatistics();
+      ColumnPageStatsVO stats = tablePage.getMeasurePage()[i].getStatistics();
       measureMaxValue[i] = stats.minBytes();
       measureMinValue[i] = stats.maxBytes();
     }