You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/06/20 09:56:10 UTC

[1/2] carbondata git commit: [CARBONDATA-2420][32K] Support string longer than 32000 characters

Repository: carbondata
Updated Branches:
  refs/heads/master c5a4ec07a -> dc53dee24


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
new file mode 100644
index 0000000..419b306
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.longstring
+
+import java.io.{File, PrintWriter}
+
+import org.apache.commons.lang3.RandomStringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  private val longStringTable = "long_string_table"
+  private val inputDir = s"$resourcesPath${File.separator}varchartype${File.separator}"
+  private val fileName = s"longStringData.csv"
+  private val inputFile = s"$inputDir$fileName"
+  private val fileName_2g_column_page = s"longStringData_exceed_2gb_column_page.csv"
+  private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page"
+  private val lineNum = 1000
+  private var content: Content = _
+  private var originMemorySize = CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+    CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+
+  case class Content(head: Int, desc_line_head: String, note_line_head: String,
+      mid: Int, desc_line_mid: String, note_line_mid: String,
+      tail: Int, desc_line_tail: String, note_line_tail: String)
+
+  override def beforeAll(): Unit = {
+    // for one 32000 lines * 32000 characters column page, it use about 1GB memory, but here we have only 1000 lines
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+    deleteFile(inputFile)
+    if (!new File(inputDir).exists()) {
+      new File(inputDir).mkdir()
+    }
+    content = createFile(inputFile, line = lineNum)
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, originMemorySize)
+    deleteFile(inputFile)
+    deleteFile(inputFile_2g_column_page)
+    if (new File(inputDir).exists()) {
+      new File(inputDir).delete()
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    sql(s"drop table if exists $longStringTable")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"drop table if exists $longStringTable")
+  }
+
+  private def prepareTable(): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE if not exists $longStringTable(
+         | id INT, name STRING, description STRING, address STRING, note STRING
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES('LONG_STRING_COLUMNS'='description, note', 'SORT_COLUMNS'='name')
+         |""".stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $longStringTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+  }
+
+  private def checkQuery(): Unit = {
+    // query without long_string_column
+    checkAnswer(sql(s"SELECT id, name, address FROM $longStringTable where id = ${content.tail}"),
+      Row(content.tail, s"name_${content.tail}", s"address_${content.tail}"))
+    // query return long_string_column in the middle position
+    checkAnswer(sql(s"SELECT id, name, description, address FROM $longStringTable where id = ${content.head}"),
+      Row(content.head, s"name_${content.head}", content.desc_line_head, s"address_${content.head}"))
+    // query return long_string_column at last position
+    checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where id = ${content.mid}"),
+      Row(content.mid, s"name_${content.mid}", s"address_${content.mid}", content.desc_line_mid))
+    // query return 2 long_string_columns
+    checkAnswer(sql(s"SELECT id, name, note, address, description FROM $longStringTable where id = ${content.mid}"),
+      Row(content.mid, s"name_${content.mid}", content.note_line_mid, s"address_${content.mid}", content.desc_line_mid))
+    // query by simple string column
+    checkAnswer(sql(s"SELECT id, note, address, description FROM $longStringTable where name = 'name_${content.tail}'"),
+      Row(content.tail, content.note_line_tail, s"address_${content.tail}", content.desc_line_tail))
+    // query by long string column
+    checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where note = '${content.note_line_tail}'"),
+      Row(content.tail, s"name_${content.tail}", s"address_${content.tail}", content.desc_line_tail))
+  }
+
+  test("Load and query with long string datatype: safe sort & safe columnpage") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: safe sort & unsafe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: unsafe sort & safe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: unsafe sort & unsafe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  // ignore this test in CI, because it will need at least 4GB memory to run successfully
+  ignore("Exceed 2GB per column page for varchar datatype") {
+    deleteFile(inputFile_2g_column_page)
+    if (!new File(inputDir).exists()) {
+      new File(inputDir).mkdir()
+    }
+    // 7000000 characters with 3200 rows will exceed 2GB constraint for one column page.
+    content = createFile2(inputFile_2g_column_page, line = 3200, varcharLen = 700000)
+
+    sql(
+      s"""
+         | CREATE TABLE if not exists $longStringTable(
+         | id INT, name STRING, description STRING, address STRING
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES('LONG_STRING_COLUMNS'='description', 'SORT_COLUMNS'='name')
+         |""".stripMargin)
+    val exceptionCaught = intercept[Exception] {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile_2g_column_page' INTO TABLE $longStringTable
+           | OPTIONS('header'='false')
+       """.stripMargin)
+    }
+    // since after exception wrapper, we cannot get the root cause directly
+  }
+
+  // will create 2 long string columns
+  private def createFile(filePath: String, line: Int = 10000, start: Int = 0,
+      varcharLen: Int = Short.MaxValue + 1000): Content = {
+    val head = 0
+    val mid = line / 2
+    var tail = line - 1
+    var desc_line_head: String = ""
+    var desc_line_mid: String = ""
+    var desc_line_tail: String = ""
+    var note_line_head: String = ""
+    var note_line_mid: String = ""
+    var note_line_tail: String = ""
+    if (new File(filePath).exists()) {
+      deleteFile(filePath)
+    }
+    val write = new PrintWriter(new File(filePath))
+    for (i <- start until (start + line)) {
+      val description = RandomStringUtils.randomAlphabetic(varcharLen)
+      val note = RandomStringUtils.randomAlphabetic(varcharLen)
+      val line = s"$i,name_$i,$description,address_$i,$note"
+      if (head == i) {
+        desc_line_head = description
+        note_line_head = note
+      } else if (mid == i) {
+        desc_line_mid = description
+        note_line_mid = note
+      } else if (tail == i) {
+        desc_line_tail = description
+        note_line_tail = note
+      }
+      write.println(line)
+    }
+    write.close()
+    Content(head, desc_line_head, note_line_head,
+      mid, desc_line_mid, note_line_mid, tail,
+      desc_line_tail, note_line_tail)
+  }
+
+  // will only create 1 long string column
+  private def createFile2(filePath: String, line: Int = 10000, start: Int = 0,
+      varcharLen: Int = Short.MaxValue + 1000): Content = {
+    val head = 0
+    val mid = line / 2
+    var tail = line - 1
+    var desc_line_head: String = ""
+    var desc_line_mid: String = ""
+    var desc_line_tail: String = ""
+    if (new File(filePath).exists()) {
+      deleteFile(filePath)
+    }
+    val write = new PrintWriter(new File(filePath))
+    for (i <- start until (start + line)) {
+      val description = RandomStringUtils.randomAlphabetic(varcharLen)
+      val note = RandomStringUtils.randomAlphabetic(varcharLen)
+      val line = s"$i,name_$i,$description,address_$i"
+      if (head == i) {
+        desc_line_head = description
+      } else if (mid == i) {
+        desc_line_mid = description
+      } else if (tail == i) {
+        desc_line_tail = description
+      }
+      write.println(line)
+    }
+    write.close()
+    Content(head, desc_line_head, "",
+      mid, desc_line_mid, "", tail,
+      desc_line_tail, "")
+  }
+
+  private def deleteFile(filePath: String): Unit = {
+    val file = new File(filePath)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 1ccbf6a..6227655 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -115,6 +115,7 @@ object CarbonScalaUtil {
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
         case CarbonDataTypes.DATE => DateType
+        case CarbonDataTypes.VARCHAR => StringType
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 6673e18..6cd28c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -126,6 +126,7 @@ object DataTypeConverterUtil {
       case "timestamp" => ThriftDataType.TIMESTAMP
       case "array" => ThriftDataType.ARRAY
       case "struct" => ThriftDataType.STRUCT
+      case "varchar" => ThriftDataType.VARCHAR
       case _ => ThriftDataType.STRING
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9af8817..0d53a73 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -280,7 +280,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     fields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
     }
-    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
+    val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
       fields, tableProperties)
 
     // column properties
@@ -391,6 +391,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
       Option(sortKeyDims),
+      Option(varcharColumns),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
@@ -691,12 +692,31 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @return
    */
   protected def extractDimAndMsrFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
+      tableProperties: Map[String, String]):
+  (Seq[Field], Seq[Field], Seq[String], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
     var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
+    var varcharCols: Seq[String] = Seq[String]()
+
+    // All long_string cols should be there in create table cols and should be of string data type
+    if (tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS).isDefined) {
+      varcharCols =
+        tableProperties(CarbonCommonConstants.LONG_STRING_COLUMNS).split(",").map(_.trim)
+      varcharCols.foreach { varcharCol =>
+        val exists = fields.exists(f => f.column.equalsIgnoreCase(varcharCol) &&
+                                        DataTypes.STRING.getName.equalsIgnoreCase(f.dataType.get))
+        if (!exists) {
+          throw new MalformedCarbonCommandException(
+            s"""
+               |${CarbonCommonConstants.LONG_STRING_COLUMNS}: $varcharCol does not exist in table
+               | or its data type is not string. Please check create table statement.
+             """.stripMargin)
+        }
+      }
+    }
 
     // All columns in sortkey should be there in create table cols
     val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
@@ -727,6 +747,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
             val errormsg = s"sort_columns is unsupported for $dataType datatype column: " + column
             throw new MalformedCarbonCommandException(errormsg)
           }
+          if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
+            throw new MalformedCarbonCommandException(
+              s"sort_columns is unsupported for long string datatype column $column")
+          }
         }
       }
 
@@ -824,9 +848,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     var sortKeyDims = sortKeyDimsTmp
     if (sortKeyOption.isEmpty) {
-      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      // if SORT_COLUMNS was not defined,
+      // add all dimension(except long string columns) to SORT_COLUMNS.
       dimFields.foreach { field =>
-        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+        if (!isComplexDimDictionaryExclude(field.dataType.get) &&
+            !varcharCols.contains(field.column)) {
           sortKeyDims :+= field.column
         }
       }
@@ -837,7 +863,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     } else {
       tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
     }
-    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims, varcharCols)
   }
 
   def isDefaultMeasure(dataType: Option[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index d48db21..c77d0df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -54,6 +54,7 @@ case class TableModel(
     dimCols: Seq[Field],
     msrCols: Seq[Field],
     sortKeyDims: Option[Seq[String]],
+    varcharCols: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
@@ -212,9 +213,9 @@ class AlterTableColumnSchemaGenerator(
     tableIdentifier: AbsoluteTableIdentifier,
     sc: SparkContext) {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
-  def isSortColumn(columnName: String): Boolean = {
+  private def isSortColumn(columnName: String): Boolean = {
     val sortColumns = alterTableModel.tableProperties.get("sort_columns")
     if(sortColumns.isDefined) {
       sortColumns.get.contains(columnName)
@@ -222,6 +223,16 @@ class AlterTableColumnSchemaGenerator(
       true
     }
   }
+
+  private def isVarcharColumn(columnName: String): Boolean = {
+    val varcharColumns = alterTableModel.tableProperties.get("long_string_columns")
+    if (varcharColumns.isDefined) {
+      varcharColumns.get.contains(columnName)
+    } else {
+      false
+    }
+  }
+
   def process: Seq[ColumnSchema] = {
     val tableSchema = tableInfo.getFactTable
     val tableCols = tableSchema.getListOfColumns.asScala
@@ -241,7 +252,8 @@ class AlterTableColumnSchemaGenerator(
         field.schemaOrdinal + existingColsSize,
         alterTableModel.highCardinalityDims,
         alterTableModel.databaseName.getOrElse(dbName),
-        isSortColumn(field.name.getOrElse(field.column)))
+        isSortColumn(field.name.getOrElse(field.column)),
+        isVarcharColumn(field.name.getOrElse(field.column)))
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
     })
@@ -351,14 +363,19 @@ object TableNewProcessor {
       schemaOrdinal: Int,
       highCardinalityDims: Seq[String],
       databaseName: String,
-      isSortColumn: Boolean = false): ColumnSchema = {
+      isSortColumn: Boolean = false,
+      isVarcharColumn: Boolean = false): ColumnSchema = {
     val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
     if (DataTypes.isDecimal(dataType)) {
       dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
       dataType.asInstanceOf[DecimalType].setScale(field.scale)
     }
     val columnSchema = new ColumnSchema()
-    columnSchema.setDataType(dataType)
+    if (isVarcharColumn) {
+      columnSchema.setDataType(DataTypes.VARCHAR)
+    } else {
+      columnSchema.setDataType(dataType)
+    }
     val colName = field.name.getOrElse(field.column)
     columnSchema.setColumnName(colName)
     if (highCardinalityDims.contains(colName)) {
@@ -415,6 +432,11 @@ class TableNewProcessor(cm: TableModel) {
     allColumns
   }
 
+  // varchar column is a string column that in long_string_columns
+  private def isVarcharColumn(colName : String): Boolean = {
+    cm.varcharCols.get.contains(colName)
+  }
+
   def getColumnSchema(
       dataType: DataType,
       colName: String,
@@ -450,6 +472,9 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
+    if (isVarcharColumn(colName)) {
+      columnSchema.setDataType(DataTypes.VARCHAR)
+    }
     if(isParentColumnRelation) {
       val dataMapField = map.get.get(field).get
       columnSchema.setFunction(dataMapField.aggregateFunction)
@@ -517,7 +542,7 @@ class TableNewProcessor(cm: TableModel) {
     val dictionaryIncludeCols = cm.tableProperties
       .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
 
-    cm.dimCols.foreach { field =>
+    def addDimensionCol(field: Field): Unit = {
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
         val encoders = if (getEncoderFromParent(field)) {
@@ -549,6 +574,12 @@ class TableNewProcessor(cm: TableModel) {
         }
       }
     }
+    // dimensions that are not varchar
+    cm.dimCols.filter(field => !cm.varcharCols.get.contains(field.column))
+      .foreach(addDimensionCol(_))
+    // dimensions that are varchar
+    cm.dimCols.filter(field => cm.varcharCols.get.contains(field.column))
+      .foreach(addDimensionCol(_))
 
     // check whether the column is a local dictionary column and set in column schema
     if (null != cm.tableProperties) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 5739d3e..2f2048d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -201,6 +201,7 @@ case class CarbonRelation(
 object CarbonMetastoreTypes extends RegexParsers {
   protected lazy val primitiveType: Parser[DataType] =
     "string" ^^^ StringType |
+    "varchar" ^^^ StringType |
     "float" ^^^ FloatType |
     "int" ^^^ IntegerType |
     "tinyint" ^^^ ShortType |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 9cf7fe4..3018e49 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -73,8 +73,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            throw new CarbonDataLoadingException(String.format(
+                "Dataload failed, String size cannot exceed %d bytes,"
+                    + " please consider long string data type",
+                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
           row.update(value, index);
         } else {
@@ -82,8 +84,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            throw new CarbonDataLoadingException(String.format(
+                "Dataload failed, String size cannot exceed %d bytes,"
+                    + " please consider long string data type",
+                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
           row.update(value, index);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index 2e3479c..86c71a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -205,7 +205,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
     parserSettings.setSkipEmptyLines(
         Boolean.valueOf(job.get(SKIP_EMPTY_LINE,
             CarbonCommonConstants.CARBON_SKIP_EMPTY_LINE_DEFAULT)));
-    parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+    // todo: will verify whether there is a performance degrade using -1 here
+    // parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+    parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_INFINITY);
     String maxColumns = job.get(MAX_COLUMNS, "" + DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
     parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
     parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 8d351cf..8bec099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -54,12 +54,13 @@ public class IntermediateSortTempRow {
   /**
    * deserialize from bytes array to get the no sort fields
    * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
+   * @param outNoDictNoSortAndVarcharDims stores the no-dict & no-sort fields,
+ *                                    including complex and varchar fields
    * @param outMeasures stores the measure fields
    * @param dataTypes data type for the measure
    */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
-      Object[] outMeasures, DataType[] dataTypes) {
+  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSortAndVarcharDims,
+      Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt) {
     ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
     // read dict_no_sort
     int dictNoSortCnt = outDictNoSort.length;
@@ -68,12 +69,20 @@ public class IntermediateSortTempRow {
     }
 
     // read no_dict_no_sort (including complex)
-    int noDictNoSortCnt = outNoDictNoSort.length;
+    int noDictNoSortCnt = outNoDictNoSortAndVarcharDims.length - varcharDimCnt;
     for (int i = 0; i < noDictNoSortCnt; i++) {
       short len = rowBuffer.getShort();
       byte[] bytes = new byte[len];
       rowBuffer.get(bytes);
-      outNoDictNoSort[i] = bytes;
+      outNoDictNoSortAndVarcharDims[i] = bytes;
+    }
+
+    // read varchar dims
+    for (int i = 0; i < varcharDimCnt; i++) {
+      int len = rowBuffer.getInt();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      outNoDictNoSortAndVarcharDims[i + noDictNoSortCnt] = bytes;
     }
 
     // read measure

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index f31a2b9..bcf8a39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -46,6 +46,7 @@ public class SortStepRowHandler implements Serializable {
   private int dictNoSortDimCnt = 0;
   private int noDictSortDimCnt = 0;
   private int noDictNoSortDimCnt = 0;
+  private int varcharDimCnt = 0;
   private int measureCnt;
 
   // indices for dict & sort dimension columns
@@ -56,6 +57,7 @@ public class SortStepRowHandler implements Serializable {
   private int[] noDictSortDimIdx;
   // indices for no-dict & no-sort dimension columns, including complex columns
   private int[] noDictNoSortDimIdx;
+  private int[] varcharDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
@@ -70,11 +72,13 @@ public class SortStepRowHandler implements Serializable {
     this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
     this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
     this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
+    this.varcharDimCnt = tableFieldStat.getVarcharDimCnt();
     this.measureCnt = tableFieldStat.getMeasureCnt();
     this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
     this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
     this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
     this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
+    this.varcharDimIdx = tableFieldStat.getVarcharDimIdx();
     this.measureIdx = tableFieldStat.getMeasureIdx();
     this.dataTypes = tableFieldStat.getMeasureDataType();
   }
@@ -122,6 +126,10 @@ public class SortStepRowHandler implements Serializable {
       for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
         nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
       }
+      // convert varchar dims
+      for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+        nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+      }
 
       // convert measure data
       for (int idx = 0; idx < this.measureCnt; idx++) {
@@ -146,13 +154,15 @@ public class SortStepRowHandler implements Serializable {
     int[] dictDims
         = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
     byte[][] noDictArray
-        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
+        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][];
 
     int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
+    byte[][] noDictNoSortAndVarcharDims
+        = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt][];
     Object[] measures = new Object[this.measureCnt];
 
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
+    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharDims, measures,
+        this.dataTypes, this.varcharDimCnt);
 
     // dict dims
     System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
@@ -163,8 +173,8 @@ public class SortStepRowHandler implements Serializable {
     // no dict dims, including complex
     System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
         noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt);
+    System.arraycopy(noDictNoSortAndVarcharDims, 0, noDictArray,
+        this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt);
 
     // measures are already here
 
@@ -428,6 +438,12 @@ public class SortStepRowHandler implements Serializable {
       rowBuffer.putShort((short) bytes.length);
       rowBuffer.put(bytes);
     }
+    // convert varchar dims
+    for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
+      rowBuffer.putInt(bytes.length);
+      rowBuffer.put(bytes);
+    }
 
     // convert measure
     Object tmpValue;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index dde18a9..9dab181 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -92,6 +92,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private boolean[] noDictionaryColMapping;
   /**
+   * boolean mapping for long string dimension
+   */
+  private boolean[] isVarcharDimMapping;
+  /**
    * agg type defined for measures
    */
   private DataType[] dataTypes;
@@ -353,13 +357,18 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     measureCount = carbonTable.getMeasureByTableName(tableName).size();
     List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
     noDictionaryColMapping = new boolean[dimensions.size()];
+    isVarcharDimMapping = new boolean[dimensions.size()];
     int i = 0;
+    int j = 0;
     for (CarbonDimension dimension : dimensions) {
       if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
         i++;
         continue;
       }
       noDictionaryColMapping[i++] = true;
+      if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
+        isVarcharDimMapping[j++] = true;
+      }
       noDictionaryCount++;
     }
     dimensionColumnCount = dimensions.size();
@@ -387,7 +396,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
             noDictionaryCount, segmentId,
-            carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
+            carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 705350c..502fa05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -111,7 +111,11 @@ public class SortParameters implements Serializable {
   private boolean[] noDictionaryDimnesionColumn;
 
   private boolean[] noDictionarySortColumn;
-
+  /**
+   * whether dimension is varchar data type.
+   * since all dimensions are string, we use an array of boolean instead of datatypes
+   */
+  private boolean[] isVarcharDimensionColumn;
   private int numberOfSortColumns;
 
   private int numberOfNoDictSortColumns;
@@ -143,6 +147,7 @@ public class SortParameters implements Serializable {
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
     parameters.noDictionarySortColumn = noDictionarySortColumn;
     parameters.numberOfSortColumns = numberOfSortColumns;
     parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
@@ -312,6 +317,14 @@ public class SortParameters implements Serializable {
     this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
   }
 
+  public boolean[] getIsVarcharDimensionColumn() {
+    return isVarcharDimensionColumn;
+  }
+
+  public void setIsVarcharDimensionColumn(boolean[] isVarcharDimensionColumn) {
+    this.isVarcharDimensionColumn = isVarcharDimensionColumn;
+  }
+
   public int getNumberOfCores() {
     return numberOfCores;
   }
@@ -371,6 +384,8 @@ public class SortParameters implements Serializable {
         .getComplexNonDictionaryColumnCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setIsVarcharDimensionColumn(
+        CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
     parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
 
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
@@ -461,7 +476,8 @@ public class SortParameters implements Serializable {
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
       int noDictionaryCount, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
+      boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn,
+      boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
@@ -476,6 +492,7 @@ public class SortParameters implements Serializable {
     parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     parameters.setComplexDimColCount(complexDimColCount);
     parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 0d1303a..094bd83 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -33,9 +33,13 @@ public class TableFieldStat implements Serializable {
   private int dictSortDimCnt = 0;
   private int dictNoSortDimCnt = 0;
   private int noDictSortDimCnt = 0;
+  // for columns that are no_dict_dim and no_sort_dim and complex, except the varchar dims
   private int noDictNoSortDimCnt = 0;
+  // for columns that are varchar data type
+  private int varcharDimCnt = 0;
   // whether sort column is of dictionary type or not
   private boolean[] isSortColNoDictFlags;
+  private boolean[] isVarcharDimFlags;
   private int measureCnt;
   private DataType[] measureDataType;
 
@@ -47,6 +51,8 @@ public class TableFieldStat implements Serializable {
   private int[] noDictSortDimIdx;
   // indices for no-dict & no-sort dimension columns, including complex columns
   private int[] noDictNoSortDimIdx;
+  // indices for varchar dimension columns
+  private int[] varcharDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
@@ -55,6 +61,7 @@ public class TableFieldStat implements Serializable {
     int complexDimCnt = sortParameters.getComplexDimColCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
     this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
+    this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
     int sortColCnt = isSortColNoDictFlags.length;
     for (boolean flag : isSortColNoDictFlags) {
       if (flag) {
@@ -66,22 +73,33 @@ public class TableFieldStat implements Serializable {
     this.measureCnt = sortParameters.getMeasureColCount();
     this.measureDataType = sortParameters.getMeasureDataType();
 
+    for (boolean flag : isVarcharDimFlags) {
+      if (flag) {
+        varcharDimCnt++;
+      }
+    }
+
     // be careful that the default value is 0
     this.dictSortDimIdx = new int[dictSortDimCnt];
     this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
     this.noDictSortDimIdx = new int[noDictSortDimCnt];
-    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
+    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt
+        - varcharDimCnt];
+    this.varcharDimIdx = new int[varcharDimCnt];
     this.measureIdx = new int[measureCnt];
 
     int tmpNoDictSortCnt = 0;
     int tmpNoDictNoSortCnt = 0;
     int tmpDictSortCnt = 0;
     int tmpDictNoSortCnt = 0;
+    int tmpVarcharCnt = 0;
     boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
 
     for (int i = 0; i < isDimNoDictFlags.length; i++) {
       if (isDimNoDictFlags[i]) {
-        if (i < sortColCnt && isSortColNoDictFlags[i]) {
+        if (isVarcharDimFlags[i]) {
+          varcharDimIdx[tmpVarcharCnt++] = i;
+        } else if (i < sortColCnt && isSortColNoDictFlags[i]) {
           noDictSortDimIdx[tmpNoDictSortCnt++] = i;
         } else {
           noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
@@ -126,10 +144,18 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimCnt;
   }
 
+  public int getVarcharDimCnt() {
+    return varcharDimCnt;
+  }
+
   public boolean[] getIsSortColNoDictFlags() {
     return isSortColNoDictFlags;
   }
 
+  public boolean[] getIsVarcharDimFlags() {
+    return isVarcharDimFlags;
+  }
+
   public int getMeasureCnt() {
     return measureCnt;
   }
@@ -154,6 +180,10 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimIdx;
   }
 
+  public int[] getVarcharDimIdx() {
+    return varcharDimIdx;
+  }
+
   public int[] getMeasureIdx() {
     return measureIdx;
   }
@@ -166,11 +196,12 @@ public class TableFieldStat implements Serializable {
         && dictNoSortDimCnt == that.dictNoSortDimCnt
         && noDictSortDimCnt == that.noDictSortDimCnt
         && noDictNoSortDimCnt == that.noDictNoSortDimCnt
+        && varcharDimCnt == that.varcharDimCnt
         && measureCnt == that.measureCnt;
   }
 
   @Override public int hashCode() {
     return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
-        noDictNoSortDimCnt, measureCnt);
+        noDictNoSortDimCnt, varcharDimCnt, measureCnt);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 5408193..1a1c5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.datastore.page.key.TablePageKey;
 import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
-import org.apache.carbondata.core.datastore.page.statistics.LVStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVLongStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -98,8 +99,16 @@ public class TablePage {
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPages.length; i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension);
-      ColumnPage page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
-      page.setStatsCollector(LVStringStatsCollector.newInstance());
+      ColumnPage page;
+      if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
+        page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize);
+        page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+      } else {
+        // In previous implementation, other data types such as string, date and timestamp
+        // will be encoded using string page
+        page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
+        page.setStatsCollector(LVShortStringStatsCollector.newInstance());
+      }
       noDictDimensionPages[i] = page;
     }
     complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()];
@@ -155,16 +164,21 @@ public class TablePage {
       dictDimensionPages[i].putData(rowId, keys[i]);
     }
 
-    // 2. convert noDictionary columns and complex columns.
+    // 2. convert noDictionary columns and complex columns and varchar columns.
     int noDictionaryCount = noDictDimensionPages.length;
     int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
+      TableSpec tableSpec = model.getTableSpec();
       byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
-        if (i < noDictionaryCount) {
+        if (tableSpec.getDimensionSpec(dictDimensionPages.length + i).getSchemaDataType()
+            == DataTypes.VARCHAR) {
+          byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]);
+          noDictDimensionPages[i].putData(rowId, valueWithLength);
+        } else if (i < noDictionaryCount) {
           // noDictionary columns, since it is variable length, we need to prepare each
           // element as LV result byte array (first two bytes are the length of the array)
-          byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+          byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]);
           noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else {
           // complex columns
@@ -250,7 +264,7 @@ public class TablePage {
   }
 
   // Adds length as a short element (first 2 bytes) to the head of the input byte array
-  private byte[] addLengthToByteArray(byte[] input) {
+  private byte[] addShortLengthToByteArray(byte[] input) {
     if (input.length > Short.MAX_VALUE) {
       throw new RuntimeException("input data length " + input.length +
           " bytes too long, maximum length supported is " + Short.MAX_VALUE + " bytes");
@@ -262,6 +276,15 @@ public class TablePage {
     return output;
   }
 
+  // Adds length as a integer element (first 4 bytes) to the head of the input byte array
+  private byte[] addIntLengthToByteArray(byte[] input) {
+    byte[] output = new byte[input.length + 4];
+    ByteBuffer buffer = ByteBuffer.wrap(output);
+    buffer.putInt(input.length);
+    buffer.put(input, 0, input.length);
+    return output;
+  }
+
   void encode() throws KeyGenException, MemoryException, IOException {
     // encode dimensions and measure
     EncodedColumnPage[] dimensions = encodeAndCompressDimensions();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f921fd5..12c95a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -223,6 +223,26 @@ public final class CarbonDataProcessorUtil {
         .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
   }
 
+  /**
+   * Preparing the boolean [] to map whether the dimension is varchar data type or not.
+   */
+  public static boolean[] getIsVarcharColumnMapping(DataField[] fields) {
+    List<Boolean> isVarcharColumnMapping = new ArrayList<Boolean>();
+    for (DataField field : fields) {
+      // for complex type need to break the loop
+      if (field.getColumn().isComplex()) {
+        break;
+      }
+
+      if (field.getColumn().isDimension()) {
+        isVarcharColumnMapping.add(
+            field.getColumn().getColumnSchema().getDataType() == DataTypes.VARCHAR);
+      }
+    }
+    return ArrayUtils.toPrimitive(
+        isVarcharColumnMapping.toArray(new Boolean[isVarcharColumnMapping.size()]));
+  }
+
   public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) {
     List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
     for (CarbonColumn column : carbonColumns) {


[2/2] carbondata git commit: [CARBONDATA-2420][32K] Support string longer than 32000 characters

Posted by ku...@apache.org.
[CARBONDATA-2420][32K] Support string longer than 32000 characters

Add a property in creating table 'long_string_columns' to support string columns that will contains more than 32000 characters.
Inside carbondata, it use an integer instead of short to store the length of bytes content.

Internally in Carbondata,

add a new datatype called varchar to represent the long string column
add a new encoding called DIRECT_COMPRESS_VARCHAR to the varcher column page meta
use an integer (previously short) to store the length of bytes content.
add 2GB constraint for one column page

This closes #2379


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dc53dee2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dc53dee2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dc53dee2

Branch: refs/heads/master
Commit: dc53dee2448f366319764021d77c4be75d43b9e3
Parents: c5a4ec0
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Sat Jun 2 15:17:04 2018 +0800
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Wed Jun 20 15:24:22 2018 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   3 +
 .../impl/FixedLengthDimensionColumnPage.java    |   2 +-
 .../impl/VariableLengthDimensionColumnPage.java |  11 +-
 ...mpressedDimensionChunkFileBasedReaderV1.java |   3 +-
 ...mpressedDimensionChunkFileBasedReaderV2.java |   3 +-
 ...mpressedDimensionChunkFileBasedReaderV3.java |   7 +-
 .../chunk/store/DimensionChunkStoreFactory.java |  22 +-
 ...ariableIntLengthDimensionDataChunkStore.java |  43 +++
 ...feVariableLengthDimensionDataChunkStore.java |  45 +--
 ...iableShortLengthDimensionDataChunkStore.java |  41 +++
 ...ariableIntLengthDimensionDataChunkStore.java |  44 +++
 ...feVariableLengthDimensionDataChunkStore.java |  54 ++--
 ...iableShortLengthDimensionDataChunkStore.java |  44 +++
 .../core/datastore/page/ColumnPage.java         |  16 +-
 .../datastore/page/VarLengthColumnPageBase.java |   6 +
 .../page/encoding/DefaultEncodingFactory.java   |   1 +
 .../page/encoding/EncodingFactory.java          |   3 +-
 .../encoding/compress/DirectCompressCodec.java  |   6 +-
 .../legacy/HighCardDictDimensionIndexCodec.java |  13 +-
 .../statistics/LVLongStringStatsCollector.java  |  51 ++++
 .../statistics/LVShortStringStatsCollector.java |  50 ++++
 .../page/statistics/LVStringStatsCollector.java |  27 +-
 .../core/indexstore/UnsafeMemoryDMStore.java    |  11 +-
 .../blockletindex/BlockletDataMap.java          |   8 +-
 .../core/indexstore/row/DataMapRow.java         |   4 +-
 .../core/indexstore/row/UnsafeDataMapRow.java   |  60 ++--
 .../core/indexstore/schema/CarbonRowSchema.java |  10 +-
 .../core/metadata/blocklet/BlockletInfo.java    |   2 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   8 +
 .../core/metadata/datatype/DataType.java        |   3 +
 .../core/metadata/datatype/DataTypes.java       |   5 +
 .../core/metadata/datatype/VarcharType.java     |  34 +++
 .../core/metadata/encoder/Encoding.java         |   5 +-
 .../schema/table/TableSchemaBuilder.java        |   1 +
 .../util/AbstractDataFileFooterConverter.java   |   2 +
 .../apache/carbondata/core/util/CarbonUtil.java |   8 +-
 .../carbondata/core/util/DataTypeUtil.java      |   4 +
 .../ThriftWrapperSchemaConverterImplTest.java   |   2 +-
 format/src/main/thrift/schema.thrift            |   3 +
 .../VarcharDataTypesBasicTestCase.scala         | 279 +++++++++++++++++++
 .../carbondata/spark/util/CarbonScalaUtil.scala |   1 +
 .../spark/util/DataTypeConverterUtil.scala      |   1 +
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |  36 ++-
 .../command/carbonTableSchemaCommon.scala       |  43 ++-
 .../apache/spark/sql/hive/CarbonRelation.scala  |   1 +
 .../impl/NonDictionaryFieldConverterImpl.java   |  12 +-
 .../loading/csvinput/CSVInputFormat.java        |   4 +-
 .../loading/row/IntermediateSortTempRow.java    |  19 +-
 .../loading/sort/SortStepRowHandler.java        |  26 +-
 .../merger/CompactionResultSortProcessor.java   |  11 +-
 .../sort/sortdata/SortParameters.java           |  21 +-
 .../sort/sortdata/TableFieldStat.java           |  37 ++-
 .../carbondata/processing/store/TablePage.java  |  37 ++-
 .../util/CarbonDataProcessorUtil.java           |  20 ++
 54 files changed, 1049 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5f06d08..118ff28 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -910,6 +910,7 @@ public final class CarbonCommonConstants {
   public static final String COLUMN_GROUPS = "column_groups";
   public static final String DICTIONARY_EXCLUDE = "dictionary_exclude";
   public static final String DICTIONARY_INCLUDE = "dictionary_include";
+  public static final String LONG_STRING_COLUMNS = "long_string_columns";
 
   /**
    * Table property to enable or disable local dictionary generation
@@ -1632,6 +1633,8 @@ public final class CarbonCommonConstants {
   // As Short data type is used for storing the length of a column during data processing hence
   // the maximum characters that can be supported should be less than Short max value
   public static final int MAX_CHARS_PER_COLUMN_DEFAULT = 32000;
+  // todo: use infinity first, will switch later
+  public static final int MAX_CHARS_PER_COLUMN_INFINITY = -1;
 
   /**
    * Enabling page level reader for compaction reduces the memory usage while compacting more

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
index 76bcf30..570404a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/FixedLengthDimensionColumnPage.java
@@ -47,7 +47,7 @@ public class FixedLengthDimensionColumnPage extends AbstractDimensionColumnPage
         dataChunk.length;
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
         .getDimensionChunkStore(columnValueSize, isExplicitSorted, numberOfRows, totalSize,
-            DimensionStoreType.FIXEDLENGTH);
+            DimensionStoreType.FIXED_LENGTH);
     dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunk);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
index 1c6b7f4..7394217 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/impl/VariableLengthDimensionColumnPage.java
@@ -30,21 +30,16 @@ public class VariableLengthDimensionColumnPage extends AbstractDimensionColumnPa
 
   /**
    * Constructor for this class
-   * @param dataChunks
-   * @param invertedIndex
-   * @param invertedIndexReverse
-   * @param numberOfRows
    */
   public VariableLengthDimensionColumnPage(byte[] dataChunks, int[] invertedIndex,
-      int[] invertedIndexReverse, int numberOfRows) {
+      int[] invertedIndexReverse, int numberOfRows, DimensionStoreType dimStoreType) {
     boolean isExplicitSorted = isExplicitSorted(invertedIndex);
-    long totalSize = isExplicitSorted ?
+    long totalSize = null != invertedIndex ?
         (dataChunks.length + (2 * numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE) + (
             numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE)) :
         (dataChunks.length + (numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE));
     dataChunkStore = DimensionChunkStoreFactory.INSTANCE
-        .getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize,
-            DimensionStoreType.VARIABLELENGTH);
+        .getDimensionChunkStore(0, isExplicitSorted, numberOfRows, totalSize, dimStoreType);
     dataChunkStore.putArray(invertedIndex, invertedIndexReverse, dataChunks);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
index 6679402..92a9684 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v1/CompressedDimensionChunkFileBasedReaderV1.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReader;
+import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.datachunk.DataChunk;
@@ -151,7 +152,7 @@ public class CompressedDimensionChunkFileBasedReaderV1 extends AbstractChunkRead
         .hasEncoding(dataChunk.getEncodingList(), Encoding.DICTIONARY)) {
       columnDataChunk =
           new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
-              numberOfRows);
+              numberOfRows, DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH);
     } else {
       // to store fixed length column chunk values
       columnDataChunk =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
index 8938260..3cdbe1d 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v2/CompressedDimensionChunkFileBasedReaderV2.java
@@ -25,6 +25,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
+import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.util.CarbonUtil;
@@ -175,7 +176,7 @@ public class CompressedDimensionChunkFileBasedReaderV2 extends AbstractChunkRead
     if (!hasEncoding(dimensionColumnChunk.encoders, Encoding.DICTIONARY)) {
       columnDataChunk =
           new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
-              numberOfRows);
+              numberOfRows, DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH);
     } else {
       // to store fixed length column chunk values
       columnDataChunk =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
index 58a9b18..782a8df 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/reader/dimension/v3/CompressedDimensionChunkFileBasedReaderV3.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColum
 import org.apache.carbondata.core.datastore.chunk.impl.VariableLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.chunk.reader.dimension.AbstractChunkReaderV2V3Format;
 import org.apache.carbondata.core.datastore.chunk.store.ColumnPageWrapper;
+import org.apache.carbondata.core.datastore.chunk.store.DimensionChunkStoreFactory;
 import org.apache.carbondata.core.datastore.columnar.UnBlockIndexer;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
@@ -271,9 +272,13 @@ public class CompressedDimensionChunkFileBasedReaderV3 extends AbstractChunkRead
     // if no dictionary column then first create a no dictionary column chunk
     // and set to data chunk instance
     if (!hasEncoding(pageMetadata.encoders, Encoding.DICTIONARY)) {
+      DimensionChunkStoreFactory.DimensionStoreType dimStoreType =
+          hasEncoding(pageMetadata.encoders, Encoding.DIRECT_COMPRESS_VARCHAR) ?
+              DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_INT_LENGTH :
+              DimensionChunkStoreFactory.DimensionStoreType.VARIABLE_SHORT_LENGTH;
       columnDataChunk =
           new VariableLengthDimensionColumnPage(dataPage, invertedIndexes, invertedIndexesReverse,
-              pageMetadata.getNumberOfRowsInpage());
+              pageMetadata.getNumberOfRowsInpage(), dimStoreType);
     } else {
       // to store fixed length column chunk values
       columnDataChunk =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
index f210641..eccfd9c 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/DimensionChunkStoreFactory.java
@@ -19,9 +19,11 @@ package org.apache.carbondata.core.datastore.chunk.store;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeFixedLengthDimensionDataChunkStore;
-import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeVariableLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeVariableIntLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.safe.SafeVariableShortLengthDimensionDataChunkStore;
 import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeFixedLengthDimensionDataChunkStore;
-import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeVariableLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeVariableIntLengthDimensionDataChunkStore;
+import org.apache.carbondata.core.datastore.chunk.store.impl.unsafe.UnsafeVariableShortLengthDimensionDataChunkStore;
 import org.apache.carbondata.core.util.CarbonProperties;
 
 /**
@@ -63,19 +65,23 @@ public class DimensionChunkStoreFactory {
       boolean isInvertedIndex, int numberOfRows, long totalSize, DimensionStoreType storeType) {
 
     if (isUnsafe) {
-      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+      if (storeType == DimensionStoreType.FIXED_LENGTH) {
         return new UnsafeFixedLengthDimensionDataChunkStore(totalSize, columnValueSize,
             isInvertedIndex, numberOfRows);
+      } else if (storeType == DimensionStoreType.VARIABLE_SHORT_LENGTH) {
+        return new UnsafeVariableShortLengthDimensionDataChunkStore(totalSize, isInvertedIndex,
+            numberOfRows);
       } else {
-        return new UnsafeVariableLengthDimensionDataChunkStore(totalSize, isInvertedIndex,
+        return new UnsafeVariableIntLengthDimensionDataChunkStore(totalSize, isInvertedIndex,
             numberOfRows);
       }
-
     } else {
-      if (storeType == DimensionStoreType.FIXEDLENGTH) {
+      if (storeType == DimensionStoreType.FIXED_LENGTH) {
         return new SafeFixedLengthDimensionDataChunkStore(isInvertedIndex, columnValueSize);
+      } else if (storeType == DimensionStoreType.VARIABLE_SHORT_LENGTH) {
+        return new SafeVariableShortLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
       } else {
-        return new SafeVariableLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
+        return new SafeVariableIntLengthDimensionDataChunkStore(isInvertedIndex, numberOfRows);
       }
     }
   }
@@ -84,6 +90,6 @@ public class DimensionChunkStoreFactory {
    * dimension store type enum
    */
   public enum DimensionStoreType {
-    FIXEDLENGTH, VARIABLELENGTH;
+    FIXED_LENGTH, VARIABLE_SHORT_LENGTH, VARIABLE_INT_LENGTH;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableIntLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableIntLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableIntLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..773f078
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableIntLengthDimensionDataChunkStore.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Below class is responsible to store variable long length(>32000) dimension data chunk in
+ * memory. Memory occupied can be on heap or offheap using unsafe interface
+ */
+public class SafeVariableIntLengthDimensionDataChunkStore
+    extends SafeVariableLengthDimensionDataChunkStore {
+  public SafeVariableIntLengthDimensionDataChunkStore(boolean isInvertedIndex, int numberOfRows) {
+    super(isInvertedIndex, numberOfRows);
+  }
+
+  @Override
+  protected int getLengthSize() {
+    return CarbonCommonConstants.INT_SIZE_IN_BYTE;
+  }
+
+  @Override
+  protected int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
index bb9c888..52e7317 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableLengthDimensionDataChunkStore.java
@@ -28,9 +28,10 @@ import org.apache.carbondata.core.util.DataTypeUtil;
 
 /**
  * Below class is responsible to store variable length dimension data chunk in
- * memory Memory occupied can be on heap or offheap using unsafe interface
+ * memory. Memory occupied can be on heap or offheap using unsafe interface
  */
-public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimensionDataChunkStore {
+public abstract class SafeVariableLengthDimensionDataChunkStore
+    extends SafeAbsractDimensionDataChunkStore {
 
   /**
    * total number of rows
@@ -56,7 +57,8 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
    * @param invertedIndexReverse inverted index reverse to be stored
    * @param data                 data to be stored
    */
-  @Override public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
+  @Override
+  public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
       byte[] data) {
     // first put the data, inverted index and reverse inverted index to memory
     super.putArray(invertedIndex, invertedIndexReverse, data);
@@ -75,21 +77,25 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
     // as first position will be start from 2 byte as data is stored first in the memory block
     // we need to skip first two bytes this is because first two bytes will be length of the data
     // which we have to skip
-    dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    dataOffsets[0] = getLengthSize();
     // creating a byte buffer which will wrap the length of the row
     ByteBuffer buffer = ByteBuffer.wrap(data);
     for (int i = 1; i < numberOfRows; i++) {
       buffer.position(startOffset);
       // so current row position will be
       // previous row length + 2 bytes used for storing previous row data
-      startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      startOffset += getLengthFromBuffer(buffer) + getLengthSize();
       // as same byte buffer is used to avoid creating many byte buffer for each row
       // we need to clear the byte buffer
-      dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      dataOffsets[i] = startOffset + getLengthSize();
     }
   }
 
-  @Override public byte[] getRow(int rowId) {
+  protected abstract int getLengthSize();
+  protected abstract int getLengthFromBuffer(ByteBuffer buffer);
+
+  @Override
+  public byte[] getRow(int rowId) {
     // if column was explicitly sorted we need to get the rowid based inverted index reverse
     if (isExplictSorted) {
       rowId = invertedIndexReverse[rowId];
@@ -101,21 +107,21 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
     // else subtract the current row offset with complete data
     // length get the offset of set of data
     int currentDataOffset = dataOffsets[rowId];
-    short length = 0;
+    int length = 0;
     // calculating the length of data
     if (rowId < numberOfRows - 1) {
-      length = (short) (dataOffsets[rowId + 1] - (currentDataOffset
-          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+      length = dataOffsets[rowId + 1] - (currentDataOffset + getLengthSize());
     } else {
       // for last record
-      length = (short) (this.data.length - currentDataOffset);
+      length = this.data.length - currentDataOffset;
     }
     byte[] currentRowData = new byte[length];
     System.arraycopy(data, currentDataOffset, currentRowData, 0, length);
     return currentRowData;
   }
 
-  @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
+  @Override
+  public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
     // if column was explicitly sorted we need to get the rowid based inverted index reverse
     if (isExplictSorted) {
       rowId = invertedIndexReverse[rowId];
@@ -127,11 +133,10 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
     // else subtract the current row offset with complete data
     // length get the offset of set of data
     int currentDataOffset = dataOffsets[rowId];
-    short length = 0;
+    int length = 0;
     // calculating the length of data
     if (rowId < numberOfRows - 1) {
-      length = (short) (dataOffsets[rowId + 1] - (currentDataOffset
-          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+      length = dataOffsets[rowId + 1] - (currentDataOffset + getLengthSize());
     } else {
       // for last record
       length = (short) (this.data.length - currentDataOffset);
@@ -162,7 +167,8 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
     }
   }
 
-  @Override public int compareTo(int rowId, byte[] compareValue) {
+  @Override
+  public int compareTo(int rowId, byte[] compareValue) {
     // now to get the row from memory block we need to do following thing
     // 1. first get the current offset
     // 2. if it's not a last row- get the next row offset
@@ -172,14 +178,13 @@ public class SafeVariableLengthDimensionDataChunkStore extends SafeAbsractDimens
 
     // get the offset of set of data
     int currentDataOffset = dataOffsets[rowId];
-    short length = 0;
+    int length = 0;
     // calculating the length of data
     if (rowId < numberOfRows - 1) {
-      length = (short) (dataOffsets[rowId + 1] - (currentDataOffset
-          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+      length = dataOffsets[rowId + 1] - (currentDataOffset + getLengthSize());
     } else {
       // for last record
-      length = (short) (this.data.length - currentDataOffset);
+      length = this.data.length - currentDataOffset;
     }
     return ByteUtil.UnsafeComparer.INSTANCE
         .compareTo(data, currentDataOffset, length, compareValue, 0, compareValue.length);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableShortLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableShortLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableShortLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..beccf86
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/safe/SafeVariableShortLengthDimensionDataChunkStore.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.safe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Below class is responsible to store variable long length(>32000) dimension data chunk in
+ * memory. Memory occupied can be on heap or offheap using unsafe interface
+ */
+public class SafeVariableShortLengthDimensionDataChunkStore
+    extends SafeVariableLengthDimensionDataChunkStore {
+  public SafeVariableShortLengthDimensionDataChunkStore(boolean isInvertedIndex, int numberOfRows) {
+    super(isInvertedIndex, numberOfRows);
+  }
+
+  @Override protected int getLengthSize() {
+    return CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+  }
+
+  @Override protected int getLengthFromBuffer(ByteBuffer buffer) {
+    return buffer.getShort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableIntLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableIntLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableIntLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..851fff6
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableIntLengthDimensionDataChunkStore.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Below class is responsible to store variable length dimension data chunk in
+ * memory Memory occupied can be on heap or offheap using unsafe interface
+ */
+public class UnsafeVariableIntLengthDimensionDataChunkStore
+    extends UnsafeVariableLengthDimensionDataChunkStore {
+  public UnsafeVariableIntLengthDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
+      int numberOfRows) {
+    super(totalSize, isInvertedIdex, numberOfRows);
+  }
+
+  @Override
+  protected int getLengthSize() {
+    return CarbonCommonConstants.INT_SIZE_IN_BYTE;
+  }
+
+  @Override
+  protected int getLengthFromBuffer(ByteBuffer byteBuffer) {
+    return byteBuffer.getInt();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
index 07dc806..801a282 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableLengthDimensionDataChunkStore.java
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.util.DataTypeUtil;
  * Below class is responsible to store variable length dimension data chunk in
  * memory Memory occupied can be on heap or offheap using unsafe interface
  */
-public class UnsafeVariableLengthDimensionDataChunkStore
+public abstract class UnsafeVariableLengthDimensionDataChunkStore
     extends UnsafeAbstractDimensionDataChunkStore {
 
   /**
@@ -67,42 +67,43 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @param invertedIndexReverse inverted index reverse to be stored
    * @param data                 data to be stored
    */
-  @Override public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
+  @Override
+  public void putArray(final int[] invertedIndex, final int[] invertedIndexReverse,
       byte[] data) {
     // first put the data, inverted index and reverse inverted index to memory
     super.putArray(invertedIndex, invertedIndexReverse, data);
     // position from where offsets will start
     this.dataPointersOffsets = this.invertedIndexReverseOffset;
     if (isExplicitSorted) {
-      this.dataPointersOffsets += (long)numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE;
+      this.dataPointersOffsets += (long) numberOfRows * CarbonCommonConstants.INT_SIZE_IN_BYTE;
     }
     // As data is of variable length and data format is
-    // <length in short><data><length in short><data>
+    // <length in short><data><length in short/int><data>
     // we need to store offset of each data so data can be accessed directly
     // for example:
     //data = {0,5,1,2,3,4,5,0,6,0,1,2,3,4,5,0,2,8,9}
     //so value stored in offset will be position of actual data
     // [2,9,17]
-    // to store this value we need to get the actual data length + 2 bytes used for storing the
+    // to store this value we need to get the actual data length + 2/4 bytes used for storing the
     // length
 
     // start position will be used to store the current data position
     int startOffset = 0;
-    // as first position will be start from 2 byte as data is stored first in the memory block
+    // as first position will be start from 2/4 byte as data is stored first in the memory block
     // we need to skip first two bytes this is because first two bytes will be length of the data
     // which we have to skip
     int [] dataOffsets = new int[numberOfRows];
-    dataOffsets[0] = CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+    dataOffsets[0] = getLengthSize();
     // creating a byte buffer which will wrap the length of the row
     ByteBuffer buffer = ByteBuffer.wrap(data);
     for (int i = 1; i < numberOfRows; i++) {
       buffer.position(startOffset);
       // so current row position will be
-      // previous row length + 2 bytes used for storing previous row data
-      startOffset += buffer.getShort() + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      // previous row length + 2/4 bytes used for storing previous row data
+      startOffset += getLengthFromBuffer(buffer) + getLengthSize();
       // as same byte buffer is used to avoid creating many byte buffer for each row
       // we need to clear the byte buffer
-      dataOffsets[i] = startOffset + CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+      dataOffsets[i] = startOffset + getLengthSize();
     }
     CarbonUnsafe.getUnsafe().copyMemory(dataOffsets, CarbonUnsafe.INT_ARRAY_OFFSET,
         dataPageMemoryBlock.getBaseObject(),
@@ -110,6 +111,9 @@ public class UnsafeVariableLengthDimensionDataChunkStore
         dataOffsets.length * CarbonCommonConstants.INT_SIZE_IN_BYTE);
   }
 
+  protected abstract int getLengthSize();
+  protected abstract int getLengthFromBuffer(ByteBuffer byteBuffer);
+
   /**
    * Below method will be used to get the row based on row id passed
    * Getting the row from unsafe works in below logic
@@ -122,13 +126,14 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @param rowId
    * @return row
    */
-  @Override public byte[] getRow(int rowId) {
+  @Override
+  public byte[] getRow(int rowId) {
     // get the actual row id
     rowId = getRowId(rowId);
     // get offset of data in unsafe
     int currentDataOffset = getOffSet(rowId);
     // get the data length
-    short length = getLength(rowId, currentDataOffset);
+    int length = getLength(rowId, currentDataOffset);
     // create data array
     byte[] data = new byte[length];
     // fill the row data
@@ -167,25 +172,24 @@ public class UnsafeVariableLengthDimensionDataChunkStore
   /**
    * To get the length of data for row id
    * if it's not a last row- get the next row offset
-   * Subtract the current row offset + 2 bytes(to skip the data length) with next row offset
+   * Subtract the current row offset + 2/4 bytes(to skip the data length) with next row offset
    * if it's last row
-   * subtract the current row offset + 2 bytes(to skip the data length) with complete data length
+   * subtract the current row offset + 2/4 bytes(to skip the data length) with complete data length
    * @param rowId rowId
    * @param currentDataOffset current data offset
    * @return length of row
    */
-  private short getLength(int rowId, int currentDataOffset) {
-    short length = 0;
+  private int getLength(int rowId, int currentDataOffset) {
+    int length = 0;
     // calculating the length of data
     if (rowId < numberOfRows - 1) {
       int OffsetOfNextdata = CarbonUnsafe.getUnsafe().getInt(dataPageMemoryBlock.getBaseObject(),
           dataPageMemoryBlock.getBaseOffset() + this.dataPointersOffsets + ((rowId + 1)
               * CarbonCommonConstants.INT_SIZE_IN_BYTE));
-      length = (short) (OffsetOfNextdata - (currentDataOffset
-          + CarbonCommonConstants.SHORT_SIZE_IN_BYTE));
+      length = OffsetOfNextdata - (currentDataOffset + getLengthSize());
     } else {
       // for last record we need to subtract with data length
-      length = (short) (this.dataLength - currentDataOffset);
+      length = this.dataLength - currentDataOffset;
     }
     return length;
   }
@@ -196,7 +200,7 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @param data data array
    * @param currentDataOffset current data offset
    */
-  private void fillRowInternal(short length, byte[] data, int currentDataOffset) {
+  private void fillRowInternal(int length, byte[] data, int currentDataOffset) {
     CarbonUnsafe.getUnsafe().copyMemory(dataPageMemoryBlock.getBaseObject(),
         dataPageMemoryBlock.getBaseOffset() + currentDataOffset, data,
         CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
@@ -217,13 +221,14 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @param vectorRow vector row id
    *
    */
-  @Override public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
+  @Override
+  public void fillRow(int rowId, CarbonColumnVector vector, int vectorRow) {
     // get the row id from reverse inverted index based on row id
     rowId = getRowId(rowId);
     // get the current row offset
     int currentDataOffset = getOffSet(rowId);
     // get the row data length
-    short length = getLength(rowId, currentDataOffset);
+    int length = getLength(rowId, currentDataOffset);
     // check if value length is less the current data length
     // then create a new array else use the same
     if (length > value.length) {
@@ -262,9 +267,10 @@ public class UnsafeVariableLengthDimensionDataChunkStore
    * @param compareValue value of to be compared
    * @return compare result
    */
-  @Override public int compareTo(int rowId, byte[] compareValue) {
+  @Override
+  public int compareTo(int rowId, byte[] compareValue) {
     int currentDataOffset = getOffSet(rowId);;
-    short length = getLength(rowId, currentDataOffset);
+    int length = getLength(rowId, currentDataOffset);
     // as this class handles this variable length data, so filter value can be
     // smaller or bigger than than actual data, so we need to take the smaller length
     int compareResult;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableShortLengthDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableShortLengthDimensionDataChunkStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableShortLengthDimensionDataChunkStore.java
new file mode 100644
index 0000000..995f5ba
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeVariableShortLengthDimensionDataChunkStore.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
+
+import java.nio.ByteBuffer;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+
+/**
+ * Below class is responsible to store variable length dimension data chunk in
+ * memory Memory occupied can be on heap or offheap using unsafe interface
+ */
+public class UnsafeVariableShortLengthDimensionDataChunkStore
+    extends UnsafeVariableLengthDimensionDataChunkStore {
+  public UnsafeVariableShortLengthDimensionDataChunkStore(long totalSize, boolean isInvertedIdex,
+      int numberOfRows) {
+    super(totalSize, isInvertedIdex, numberOfRows);
+  }
+
+  @Override
+  protected int getLengthSize() {
+    return CarbonCommonConstants.SHORT_SIZE_IN_BYTE;
+  }
+
+  @Override
+  protected int getLengthFromBuffer(ByteBuffer byteBuffer) {
+    return byteBuffer.getShort();
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
index 69ed437..4dcf514 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/ColumnPage.java
@@ -203,7 +203,9 @@ public abstract class ColumnPage {
         instance = new UnsafeFixLengthColumnPage(columnSpec, dataType, pageSize);
       } else if (DataTypes.isDecimal(dataType)) {
         instance = new UnsafeDecimalColumnPage(columnSpec, dataType, pageSize);
-      } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      } else if (dataType == DataTypes.STRING
+          || dataType == DataTypes.BYTE_ARRAY
+          || dataType == DataTypes.VARCHAR) {
         instance = new UnsafeVarLengthColumnPage(columnSpec, dataType, pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -225,7 +227,9 @@ public abstract class ColumnPage {
         instance = newDoublePage(columnSpec, new double[pageSize]);
       } else if (DataTypes.isDecimal(dataType)) {
         instance = newDecimalPage(columnSpec, new byte[pageSize][]);
-      } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+      } else if (dataType == DataTypes.STRING
+          || dataType == DataTypes.BYTE_ARRAY
+          || dataType == DataTypes.VARCHAR) {
         instance = new SafeVarLengthColumnPage(columnSpec, dataType, pageSize);
       } else {
         throw new RuntimeException("Unsupported data dataType: " + dataType);
@@ -398,7 +402,9 @@ public abstract class ColumnPage {
     } else if (DataTypes.isDecimal(dataType)) {
       putDecimal(rowId, (BigDecimal) value);
       statsCollector.update((BigDecimal) value);
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING
+        || dataType == DataTypes.BYTE_ARRAY
+        || dataType == DataTypes.VARCHAR) {
       putBytes(rowId, (byte[]) value);
       statsCollector.update((byte[]) value);
     } else {
@@ -431,7 +437,9 @@ public abstract class ColumnPage {
       return getDouble(rowId);
     } else if (DataTypes.isDecimal(dataType)) {
       return getDecimal(rowId);
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.BYTE_ARRAY) {
+    } else if (dataType == DataTypes.STRING
+        || dataType == DataTypes.BYTE_ARRAY
+        || dataType == DataTypes.VARCHAR) {
       return getBytes(rowId);
     } else {
       throw new RuntimeException("unsupported data type: " + dataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index 901758a..cb907a5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -289,6 +289,12 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
 
   @Override
   public void putBytes(int rowId, byte[] bytes) {
+    // rowId * 4 represents the length of L in LV
+    if (bytes.length > (Integer.MAX_VALUE - totalLength - rowId * 4)) {
+      // since we later store a column page in a byte array, so its maximum size is 2GB
+      throw new RuntimeException("Carbondata only support maximum 2GB size for one column page,"
+          + " exceed this limit at rowId " + rowId);
+    }
     if (rowId == 0) {
       rowOffset[0] = 0;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
index 00f7a0f..816b01f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/DefaultEncodingFactory.java
@@ -103,6 +103,7 @@ public class DefaultEncodingFactory extends EncodingFactory {
         return new HighCardDictDimensionIndexCodec(
             dimensionSpec.isInSortColumns(),
             dimensionSpec.isInSortColumns() && dimensionSpec.isDoInvertedIndex(),
+            dimensionSpec.getSchemaDataType() == DataTypes.VARCHAR,
             compressor).createEncoder(null);
       default:
         throw new RuntimeException("unsupported dimension type: " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
index 318d55d..a661a49 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/EncodingFactory.java
@@ -47,6 +47,7 @@ import static org.apache.carbondata.format.Encoding.ADAPTIVE_FLOATING;
 import static org.apache.carbondata.format.Encoding.ADAPTIVE_INTEGRAL;
 import static org.apache.carbondata.format.Encoding.BOOL_BYTE;
 import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS;
+import static org.apache.carbondata.format.Encoding.DIRECT_COMPRESS_VARCHAR;
 import static org.apache.carbondata.format.Encoding.RLE_INTEGRAL;
 
 /**
@@ -71,7 +72,7 @@ public abstract class EncodingFactory {
     byte[] encoderMeta = encoderMetas.get(0).array();
     ByteArrayInputStream stream = new ByteArrayInputStream(encoderMeta);
     DataInputStream in = new DataInputStream(stream);
-    if (encoding == DIRECT_COMPRESS) {
+    if (encoding == DIRECT_COMPRESS || encoding == DIRECT_COMPRESS_VARCHAR) {
       ColumnPageEncoderMeta metadata = new ColumnPageEncoderMeta();
       metadata.readFields(in);
       return new DirectCompressCodec(metadata.getStoreDataType()).createDecoder(metadata);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
index cfdf114..4c1bc49 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/compress/DirectCompressCodec.java
@@ -64,7 +64,7 @@ public class DirectCompressCodec implements ColumnPageCodec {
     return new DirectDecompressor(meta);
   }
 
-  private static class DirectCompressor extends ColumnPageEncoder {
+  private class DirectCompressor extends ColumnPageEncoder {
 
     private Compressor compressor;
 
@@ -80,7 +80,9 @@ public class DirectCompressCodec implements ColumnPageCodec {
     @Override
     protected List<Encoding> getEncodingList() {
       List<Encoding> encodings = new ArrayList<>();
-      encodings.add(Encoding.DIRECT_COMPRESS);
+      encodings.add(dataType == DataTypes.VARCHAR ?
+          Encoding.DIRECT_COMPRESS_VARCHAR :
+          Encoding.DIRECT_COMPRESS);
       return encodings;
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
index d722c38..741dbfe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/encoding/dimension/legacy/HighCardDictDimensionIndexCodec.java
@@ -30,11 +30,16 @@ import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoder;
 import org.apache.carbondata.core.util.ByteUtil;
 import org.apache.carbondata.format.Encoding;
 
-public class HighCardDictDimensionIndexCodec  extends IndexStorageCodec {
+public class HighCardDictDimensionIndexCodec extends IndexStorageCodec {
+  /**
+   * whether this column is varchar data type(long string)
+   */
+  private boolean isVarcharType;
 
   public HighCardDictDimensionIndexCodec(boolean isSort, boolean isInvertedIndex,
-      Compressor compressor) {
+      boolean isVarcharType, Compressor compressor) {
     super(isSort, isInvertedIndex, compressor);
+    this.isVarcharType = isVarcharType;
   }
 
   @Override
@@ -63,7 +68,9 @@ public class HighCardDictDimensionIndexCodec  extends IndexStorageCodec {
       @Override
       protected List<Encoding> getEncodingList() {
         List<Encoding> encodings = new ArrayList<>();
-        if (indexStorage.getRowIdPageLengthInBytes() > 0) {
+        if (isVarcharType) {
+          encodings.add(Encoding.DIRECT_COMPRESS_VARCHAR);
+        } else if (indexStorage.getRowIdPageLengthInBytes() > 0) {
           encodings.add(Encoding.INVERTED_INDEX);
         }
         return encodings;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
new file mode 100644
index 0000000..a7bb47e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVLongStringStatsCollector.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * This class is for the columns with varchar data type,
+ * a string type which can hold more than 32000 characters
+ */
+public class LVLongStringStatsCollector extends LVStringStatsCollector {
+
+  public static LVLongStringStatsCollector newInstance() {
+    return new LVLongStringStatsCollector();
+  }
+
+  private LVLongStringStatsCollector() {
+
+  }
+
+  @Override
+  protected byte[] getActualValue(byte[] value) {
+    byte[] actualValue;
+    assert (value.length >= 4);
+    if (value.length == 4) {
+      assert (value[0] == 0 && value[1] == 0);
+      actualValue = new byte[0];
+    } else {
+      int length = ByteUtil.toInt(value, 0);
+      assert (length > 0);
+      actualValue = new byte[value.length - 4];
+      System.arraycopy(value, 4, actualValue, 0, actualValue.length);
+    }
+    return actualValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
new file mode 100644
index 0000000..21b06d5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVShortStringStatsCollector.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.page.statistics;
+
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * This class is for the columns with string data type which hold less than 32000 characters
+ */
+public class LVShortStringStatsCollector extends LVStringStatsCollector {
+
+  public static LVShortStringStatsCollector newInstance() {
+    return new LVShortStringStatsCollector();
+  }
+
+  private LVShortStringStatsCollector() {
+
+  }
+
+  @Override
+  protected byte[] getActualValue(byte[] value) {
+    byte[] actualValue;
+    assert (value.length >= 2);
+    if (value.length == 2) {
+      assert (value[0] == 0 && value[1] == 0);
+      actualValue = new byte[0];
+    } else {
+      int length = ByteUtil.toShort(value, 0);
+      assert (length > 0);
+      actualValue = new byte[value.length - 2];
+      System.arraycopy(value, 2, actualValue, 0, actualValue.length);
+    }
+    return actualValue;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
index 7958a8d..e1ac676 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/statistics/LVStringStatsCollector.java
@@ -23,18 +23,10 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.ByteUtil;
 
-public class LVStringStatsCollector implements ColumnPageStatsCollector {
+public abstract class LVStringStatsCollector implements ColumnPageStatsCollector {
 
   private byte[] min, max;
 
-  public static LVStringStatsCollector newInstance() {
-    return new LVStringStatsCollector();
-  }
-
-  private LVStringStatsCollector() {
-
-  }
-
   @Override
   public void updateNull(int rowId) {
 
@@ -70,22 +62,13 @@ public class LVStringStatsCollector implements ColumnPageStatsCollector {
 
   }
 
+  protected abstract byte[] getActualValue(byte[] value);
+
   @Override
   public void update(byte[] value) {
     // input value is LV encoded
-    byte[] newValue = null;
-    assert (value.length >= 2);
-    if (value.length == 2) {
-      assert (value[0] == 0 && value[1] == 0);
-      newValue = new byte[0];
-    } else {
-      int length = (value[0] << 8) + (value[1] & 0xff);
-      assert (length > 0);
-      newValue = new byte[value.length - 2];
-      System.arraycopy(value, 2, newValue, 0, newValue.length);
-    }
-
-    if (null == min) {
+    byte[] newValue = getActualValue(value);
+    if (min == null) {
       min = newValue;
     }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index ca5e2dd..599877c 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -144,7 +144,7 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
               "unsupported data type for unsafe storage: " + schema.getDataType());
         }
         break;
-      case VARIABLE:
+      case VARIABLE_SHORT:
         byte[] data = row.getByteArray(index);
         getUnsafe().putShort(memoryBlock.getBaseObject(),
             memoryBlock.getBaseOffset() + runningLength, (short) data.length);
@@ -153,6 +153,15 @@ public class UnsafeMemoryDMStore extends AbstractMemoryDMStore {
             memoryBlock.getBaseOffset() + runningLength, data.length);
         runningLength += data.length;
         break;
+      case VARIABLE_INT:
+        byte[] data2 = row.getByteArray(index);
+        getUnsafe().putInt(memoryBlock.getBaseObject(),
+            memoryBlock.getBaseOffset() + runningLength, data2.length);
+        runningLength += 4;
+        getUnsafe().copyMemory(data2, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+            memoryBlock.getBaseOffset() + runningLength, data2.length);
+        runningLength += data2.length;
+        break;
       case STRUCT:
         CarbonRowSchema[] childSchemas =
             ((CarbonRowSchema.StructCarbonRowSchema) schema).getChildSchemas();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index 6e43fbc..4b5b36b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -607,7 +607,13 @@ public class BlockletDataMap extends CoarseGrainDataMap implements Serializable
       CarbonRowSchema[] mapSchemas = new CarbonRowSchema[minMaxLen.length];
       for (int i = 0; i < minMaxLen.length; i++) {
         if (minMaxLen[i] <= 0) {
-          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY);
+          boolean isVarchar = false;
+          if (i < segmentProperties.getDimensions().size()
+              && segmentProperties.getDimensions().get(i).getDataType() == DataTypes.VARCHAR) {
+            isVarchar = true;
+          }
+          mapSchemas[i] = new CarbonRowSchema.VariableCarbonRowSchema(DataTypes.BYTE_ARRAY,
+              isVarchar);
         } else {
           mapSchemas[i] =
               new CarbonRowSchema.FixedCarbonRowSchema(DataTypes.BYTE_ARRAY, minMaxLen[i]);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
index 496a1d0..b8b46ef 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRow.java
@@ -78,8 +78,10 @@ public abstract class DataMapRow implements Serializable {
     switch (schemas[ordinal].getSchemaType()) {
       case FIXED:
         return schemas[ordinal].getLength();
-      case VARIABLE:
+      case VARIABLE_SHORT:
         return getLengthInBytes(ordinal) + 2;
+      case VARIABLE_INT:
+        return getLengthInBytes(ordinal) + 4;
       case STRUCT:
         return getRow(ordinal).getTotalSizeInBytes();
       default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
index 1c1ecad..127e2a9 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/UnsafeDataMapRow.java
@@ -49,11 +49,16 @@ public class UnsafeDataMapRow extends DataMapRow {
     int length;
     int position = getPosition(ordinal);
     switch (schemas[ordinal].getSchemaType()) {
-      case VARIABLE:
-        length =
-            getUnsafe().getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+      case VARIABLE_SHORT:
+        length = getUnsafe().getShort(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
         position += 2;
         break;
+      case VARIABLE_INT:
+        length = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
+        position += 4;
+        break;
       default:
         length = schemas[ordinal].getLength();
     }
@@ -67,9 +72,13 @@ public class UnsafeDataMapRow extends DataMapRow {
     int length;
     int position = getPosition(ordinal);
     switch (schemas[ordinal].getSchemaType()) {
-      case VARIABLE:
-        length =
-            getUnsafe().getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+      case VARIABLE_SHORT:
+        length = getUnsafe().getShort(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
+        break;
+      case VARIABLE_INT:
+        length = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
         break;
       default:
         length = schemas[ordinal].getLength();
@@ -80,9 +89,13 @@ public class UnsafeDataMapRow extends DataMapRow {
   private int getLengthInBytes(int ordinal, int position) {
     int length;
     switch (schemas[ordinal].getSchemaType()) {
-      case VARIABLE:
-        length =
-            getUnsafe().getShort(block.getBaseObject(), block.getBaseOffset() + pointer + position);
+      case VARIABLE_SHORT:
+        length = getUnsafe().getShort(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
+        break;
+      case VARIABLE_INT:
+        length = getUnsafe().getInt(block.getBaseObject(),
+            block.getBaseOffset() + pointer + position);
         break;
       default:
         length = schemas[ordinal].getLength();
@@ -226,21 +239,28 @@ public class UnsafeDataMapRow extends DataMapRow {
                 "unsupported data type for unsafe storage: " + schema.getDataType());
           }
           break;
-        case VARIABLE:
-          short length = getUnsafe().getShort(
-              block.getBaseObject(),
-              block.getBaseOffset() + pointer + runningLength);
+        case VARIABLE_SHORT:
+          int length = getUnsafe()
+              .getShort(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
           runningLength += 2;
           byte[] data = new byte[length];
-          getUnsafe().copyMemory(
-              block.getBaseObject(),
+          getUnsafe().copyMemory(block.getBaseObject(),
               block.getBaseOffset() + pointer + runningLength,
-                  data,
-              BYTE_ARRAY_OFFSET,
-              data.length);
+              data, BYTE_ARRAY_OFFSET, data.length);
           runningLength += data.length;
           row.setByteArray(data, i);
           break;
+        case VARIABLE_INT:
+          int length2 = getUnsafe()
+              .getInt(block.getBaseObject(), block.getBaseOffset() + pointer + runningLength);
+          runningLength += 4;
+          byte[] data2 = new byte[length2];
+          getUnsafe().copyMemory(block.getBaseObject(),
+              block.getBaseOffset() + pointer + runningLength,
+              data2, BYTE_ARRAY_OFFSET, data2.length);
+          runningLength += data2.length;
+          row.setByteArray(data2, i);
+          break;
         case STRUCT:
           DataMapRow structRow = ((UnsafeDataMapRow) getRow(i)).convertToSafeRow();
           row.setRow(structRow, i);
@@ -260,8 +280,10 @@ public class UnsafeDataMapRow extends DataMapRow {
     switch (schemas[ordinal].getSchemaType()) {
       case FIXED:
         return schemas[ordinal].getLength();
-      case VARIABLE:
+      case VARIABLE_SHORT:
         return getLengthInBytes(ordinal, position) + 2;
+      case VARIABLE_INT:
+        return getLengthInBytes(ordinal, position) + 4;
       case STRUCT:
         return getRow(ordinal).getTotalSizeInBytes();
       default:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
index 1a77467..971f42a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/schema/CarbonRowSchema.java
@@ -90,17 +90,23 @@ public abstract class CarbonRowSchema implements Serializable {
   }
 
   public static class VariableCarbonRowSchema extends CarbonRowSchema {
+    private boolean isVarcharType = false;
 
     public VariableCarbonRowSchema(DataType dataType) {
       super(dataType);
     }
 
+    public VariableCarbonRowSchema(DataType dataType, boolean isVarcharType) {
+      super(dataType);
+      this.isVarcharType = isVarcharType;
+    }
+
     @Override public int getLength() {
       return dataType.getSizeInBytes();
     }
 
     @Override public DataMapSchemaType getSchemaType() {
-      return DataMapSchemaType.VARIABLE;
+      return isVarcharType ? DataMapSchemaType.VARIABLE_INT : DataMapSchemaType.VARIABLE_SHORT;
     }
   }
 
@@ -127,6 +133,6 @@ public abstract class CarbonRowSchema implements Serializable {
   }
 
   public enum DataMapSchemaType {
-    FIXED, VARIABLE, STRUCT
+    FIXED, VARIABLE_INT, VARIABLE_SHORT, STRUCT
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
index f77358f..420cd4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/blocklet/BlockletInfo.java
@@ -268,7 +268,7 @@ public class BlockletInfo implements Serializable, Writable {
   @Override public void readFields(DataInput input) throws IOException {
     dimensionOffset = input.readLong();
     measureOffsets = input.readLong();
-    short dimensionChunkOffsetsSize = input.readShort();
+    int dimensionChunkOffsetsSize = input.readShort();
     dimensionChunkOffsets = new ArrayList<>(dimensionChunkOffsetsSize);
     for (int i = 0; i < dimensionChunkOffsetsSize; i++) {
       dimensionChunkOffsets.add(input.readLong());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 12f5fc3..87dda33 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -112,6 +112,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return org.apache.carbondata.format.Encoding.RLE;
       case INVERTED_INDEX:
         return org.apache.carbondata.format.Encoding.INVERTED_INDEX;
+      case DIRECT_COMPRESS_VARCHAR:
+        return org.apache.carbondata.format.Encoding.DIRECT_COMPRESS_VARCHAR;
       case BIT_PACKED:
         return org.apache.carbondata.format.Encoding.BIT_PACKED;
       case DIRECT_DICTIONARY:
@@ -154,6 +156,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
       return org.apache.carbondata.format.DataType.ARRAY;
     } else if (DataTypes.isStructType(dataType)) {
       return org.apache.carbondata.format.DataType.STRUCT;
+    } else if (dataType.getId() == DataTypes.VARCHAR.getId()) {
+      return org.apache.carbondata.format.DataType.VARCHAR;
     } else {
       return org.apache.carbondata.format.DataType.STRING;
     }
@@ -447,6 +451,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return Encoding.RLE;
       case INVERTED_INDEX:
         return Encoding.INVERTED_INDEX;
+      case DIRECT_COMPRESS_VARCHAR:
+        return Encoding.DIRECT_COMPRESS_VARCHAR;
       case BIT_PACKED:
         return Encoding.BIT_PACKED;
       case DIRECT_DICTIONARY:
@@ -490,6 +496,8 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
         return DataTypes.createDefaultArrayType();
       case STRUCT:
         return DataTypes.createDefaultStructType();
+      case VARCHAR:
+        return DataTypes.VARCHAR;
       default:
         return DataTypes.STRING;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
index d71f984..4dc1fbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataType.java
@@ -69,6 +69,7 @@ public class DataType implements Serializable {
 
   public static final char DOUBLE_MEASURE_CHAR = 'n';
   public static final char STRING_CHAR = 's';
+  public static final char VARCHAR_CHAR = 'v';
   public static final char TIMESTAMP_CHAR = 't';
   public static final char DATE_CHAR = 'x';
   public static final char BYTE_ARRAY_CHAR = 'y';
@@ -89,6 +90,8 @@ public class DataType implements Serializable {
       return BIG_DECIMAL_MEASURE_CHAR;
     } else if (dataType == DataTypes.STRING) {
       return STRING_CHAR;
+    } else if (dataType == DataTypes.VARCHAR) {
+      return VARCHAR_CHAR;
     } else if (dataType == DataTypes.TIMESTAMP) {
       return TIMESTAMP_CHAR;
     } else if (dataType == DataTypes.DATE) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
index dc89a41..d71eea4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/DataTypes.java
@@ -47,6 +47,8 @@ public class DataTypes {
   // Only for internal use for backward compatability. It is only used for V1 version
   public static final DataType LEGACY_LONG = LegacyLongType.LEGACY_LONG;
 
+  public static final DataType VARCHAR = VarcharType.VARCHAR;
+
   // these IDs are used within this package only
   static final int STRING_TYPE_ID = 0;
   static final int DATE_TYPE_ID = 1;
@@ -66,6 +68,7 @@ public class DataTypes {
   public static final int ARRAY_TYPE_ID = 11;
   public static final int STRUCT_TYPE_ID = 12;
   public static final int MAP_TYPE_ID = 13;
+  public static final int VARCHAR_TYPE_ID = 18;
 
   /**
    * create a DataType instance from uniqueId of the DataType
@@ -107,6 +110,8 @@ public class DataTypes {
       return createDefaultMapType();
     } else if (id == BYTE_ARRAY.getId()) {
       return BYTE_ARRAY;
+    } else if (id == VARCHAR.getId()) {
+      return VARCHAR;
     } else {
       throw new RuntimeException("create DataType with invalid id: " + id);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/datatype/VarcharType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/datatype/VarcharType.java b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/VarcharType.java
new file mode 100644
index 0000000..bfde1a9
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/datatype/VarcharType.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.datatype;
+
+/**
+ * This class is for internal use. It is used to support string that longer than 32000 characters
+ */
+public class VarcharType extends DataType {
+  static final DataType VARCHAR = new VarcharType(DataTypes.VARCHAR_TYPE_ID, 0, "VARCHAR", -1);
+
+  private VarcharType(int id, int precedenceOrder, String name, int sizeInBytes) {
+    super(id, precedenceOrder, name, sizeInBytes);
+  }
+
+  // this function is needed to ensure singleton pattern while supporting java serialization
+  private Object readResolve() {
+    return DataTypes.VARCHAR;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
index 06d09f8..f3c21b1 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/encoder/Encoding.java
@@ -31,7 +31,8 @@ public enum Encoding {
   DIRECT_COMPRESS,
   ADAPTIVE_INTEGRAL,
   ADAPTIVE_DELTA_INTEGRAL,
-  RLE_INTEGRAL;
+  RLE_INTEGRAL,
+  DIRECT_COMPRESS_VARCHAR;
 
   public static Encoding valueOf(int ordinal) {
     if (ordinal == DICTIONARY.ordinal()) {
@@ -56,6 +57,8 @@ public enum Encoding {
       return ADAPTIVE_DELTA_INTEGRAL;
     } else if (ordinal == RLE_INTEGRAL.ordinal()) {
       return RLE_INTEGRAL;
+    } else if (ordinal == DIRECT_COMPRESS_VARCHAR.ordinal()) {
+      return DIRECT_COMPRESS_VARCHAR;
     } else {
       throw new RuntimeException("create Encoding with invalid ordinal: " + ordinal);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
index bb7e901..40f8725 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchemaBuilder.java
@@ -203,6 +203,7 @@ public class TableSchemaBuilder {
         }
       }
     }
+    // todo: need more information such as long_string_columns
     return newColumn;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
index f005d88..7cd0c18 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/AbstractDataFileFooterConverter.java
@@ -436,6 +436,8 @@ public abstract class AbstractDataFileFooterConverter {
         return DataTypes.createDefaultArrayType();
       case STRUCT:
         return DataTypes.createDefaultStructType();
+      case VARCHAR:
+        return DataTypes.VARCHAR;
       default:
         return DataTypes.STRING;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 2f34163..1f6c697 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2309,6 +2309,8 @@ public final class CarbonUtil {
         return DataTypes.createDefaultArrayType();
       case STRUCT:
         return DataTypes.createDefaultStructType();
+      case VARCHAR:
+        return DataTypes.VARCHAR;
       default:
         return DataTypes.STRING;
     }
@@ -2499,8 +2501,10 @@ public final class CarbonUtil {
       return DataTypeUtil.bigDecimalToByte((BigDecimal) value);
     } else if (dataType == DataTypes.BYTE_ARRAY) {
       return (byte[]) value;
-    } else if (dataType == DataTypes.STRING || dataType == DataTypes.TIMESTAMP ||
-        dataType == DataTypes.DATE) {
+    } else if (dataType == DataTypes.STRING
+        || dataType == DataTypes.TIMESTAMP
+        || dataType == DataTypes.DATE
+        || dataType == DataTypes.VARCHAR) {
       return (byte[]) value;
     } else {
       throw new IllegalArgumentException("Invalid data type: " + dataType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
index e06c82e..c84b0da 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DataTypeUtil.java
@@ -856,6 +856,8 @@ public final class DataTypeUtil {
       return DataTypes.FLOAT;
     } else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(name)) {
       return DataTypes.DOUBLE;
+    } else if (DataTypes.VARCHAR.getName().equalsIgnoreCase(name)) {
+      return DataTypes.VARCHAR;
     } else if (DataTypes.NULL.getName().equalsIgnoreCase(name)) {
       return DataTypes.NULL;
     } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(name)) {
@@ -904,6 +906,8 @@ public final class DataTypeUtil {
       return DataTypes.FLOAT;
     } else if (DataTypes.DOUBLE.getName().equalsIgnoreCase(dataType.getName())) {
       return DataTypes.DOUBLE;
+    } else if (DataTypes.VARCHAR.getName().equalsIgnoreCase(dataType.getName())) {
+      return DataTypes.VARCHAR;
     } else if (DataTypes.NULL.getName().equalsIgnoreCase(dataType.getName())) {
       return DataTypes.NULL;
     } else if (DataTypes.BYTE_ARRAY.getName().equalsIgnoreCase(dataType.getName())) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
index 67c7594..522bf41 100644
--- a/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImplTest.java
@@ -1562,7 +1562,7 @@ public class ThriftWrapperSchemaConverterImplTest {
   }
 
   @Test public void testFromExternalToWrapperSchemaEvolutionEntry() {
-long time =1112745600000L;
+    long time =1112745600000L;
     ColumnSchema wrapperColumnSchema = new ColumnSchema();
     wrapperColumnSchema.setColumnUniqueId("1");
     wrapperColumnSchema.setColumnName("columnName");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/format/src/main/thrift/schema.thrift
----------------------------------------------------------------------
diff --git a/format/src/main/thrift/schema.thrift b/format/src/main/thrift/schema.thrift
index b44fe19..3af2b9a 100644
--- a/format/src/main/thrift/schema.thrift
+++ b/format/src/main/thrift/schema.thrift
@@ -35,6 +35,7 @@ enum DataType {
 	BOOLEAN = 8,
 	ARRAY = 20,
 	STRUCT = 21,
+	VARCHAR = 22,
 }
 
 /**
@@ -56,6 +57,7 @@ enum Encoding{
 	ADAPTIVE_FLOATING = 11; // Identifies that a column is encoded using AdaptiveFloatingCodec
 	BOOL_BYTE = 12;   // Identifies that a column is encoded using BooleanPageCodec
 	ADAPTIVE_DELTA_FLOATING = 13; // Identifies that a column is encoded using AdaptiveDeltaFloatingCodec
+	DIRECT_COMPRESS_VARCHAR = 14;  // Identifies that a columm is encoded using DirectCompressCodec, it is used for long string columns
 }
 
 enum PartitionType{
@@ -173,6 +175,7 @@ struct TableSchema{
   4: optional map<string,string> tableProperties; // Table properties configured by the user
   5: optional BucketingInfo bucketingInfo; // Bucketing information
   6: optional PartitionInfo partitionInfo; // Partition information
+  7: optional list<string> long_string_columns // long string columns in the table
 }
 
 struct RelationIdentifier {