You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:41 UTC

[40/50] [abbrv] carbondata git commit: [CARBONDATA-2420][32K] Support string longer than 32000 characters

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
new file mode 100644
index 0000000..419b306
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements.  See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License.  You may obtain a copy of the License at
+*
+*    http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.longstring
+
+import java.io.{File, PrintWriter}
+
+import org.apache.commons.lang3.RandomStringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  private val longStringTable = "long_string_table"
+  private val inputDir = s"$resourcesPath${File.separator}varchartype${File.separator}"
+  private val fileName = s"longStringData.csv"
+  private val inputFile = s"$inputDir$fileName"
+  private val fileName_2g_column_page = s"longStringData_exceed_2gb_column_page.csv"
+  private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page"
+  private val lineNum = 1000
+  private var content: Content = _
+  private var originMemorySize = CarbonProperties.getInstance().getProperty(
+    CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+    CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+
+  case class Content(head: Int, desc_line_head: String, note_line_head: String,
+      mid: Int, desc_line_mid: String, note_line_mid: String,
+      tail: Int, desc_line_tail: String, note_line_tail: String)
+
+  override def beforeAll(): Unit = {
+    // for one 32000 lines * 32000 characters column page, it use about 1GB memory, but here we have only 1000 lines
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+    deleteFile(inputFile)
+    if (!new File(inputDir).exists()) {
+      new File(inputDir).mkdir()
+    }
+    content = createFile(inputFile, line = lineNum)
+  }
+
+  override def afterAll(): Unit = {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, originMemorySize)
+    deleteFile(inputFile)
+    deleteFile(inputFile_2g_column_page)
+    if (new File(inputDir).exists()) {
+      new File(inputDir).delete()
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    sql(s"drop table if exists $longStringTable")
+  }
+
+  override def afterEach(): Unit = {
+    sql(s"drop table if exists $longStringTable")
+  }
+
+  private def prepareTable(): Unit = {
+    sql(
+      s"""
+         | CREATE TABLE if not exists $longStringTable(
+         | id INT, name STRING, description STRING, address STRING, note STRING
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES('LONG_STRING_COLUMNS'='description, note', 'SORT_COLUMNS'='name')
+         |""".stripMargin)
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $longStringTable
+         | OPTIONS('header'='false')
+       """.stripMargin)
+  }
+
+  private def checkQuery(): Unit = {
+    // query without long_string_column
+    checkAnswer(sql(s"SELECT id, name, address FROM $longStringTable where id = ${content.tail}"),
+      Row(content.tail, s"name_${content.tail}", s"address_${content.tail}"))
+    // query return long_string_column in the middle position
+    checkAnswer(sql(s"SELECT id, name, description, address FROM $longStringTable where id = ${content.head}"),
+      Row(content.head, s"name_${content.head}", content.desc_line_head, s"address_${content.head}"))
+    // query return long_string_column at last position
+    checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where id = ${content.mid}"),
+      Row(content.mid, s"name_${content.mid}", s"address_${content.mid}", content.desc_line_mid))
+    // query return 2 long_string_columns
+    checkAnswer(sql(s"SELECT id, name, note, address, description FROM $longStringTable where id = ${content.mid}"),
+      Row(content.mid, s"name_${content.mid}", content.note_line_mid, s"address_${content.mid}", content.desc_line_mid))
+    // query by simple string column
+    checkAnswer(sql(s"SELECT id, note, address, description FROM $longStringTable where name = 'name_${content.tail}'"),
+      Row(content.tail, content.note_line_tail, s"address_${content.tail}", content.desc_line_tail))
+    // query by long string column
+    checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where note = '${content.note_line_tail}'"),
+      Row(content.tail, s"name_${content.tail}", s"address_${content.tail}", content.desc_line_tail))
+  }
+
+  test("Load and query with long string datatype: safe sort & safe columnpage") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: safe sort & unsafe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: unsafe sort & safe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  test("Load and query with long string datatype: unsafe sort & unsafe column page") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+    prepareTable()
+    checkQuery()
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+      CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+      CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+      CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+  }
+
+  // ignore this test in CI, because it will need at least 4GB memory to run successfully
+  ignore("Exceed 2GB per column page for varchar datatype") {
+    deleteFile(inputFile_2g_column_page)
+    if (!new File(inputDir).exists()) {
+      new File(inputDir).mkdir()
+    }
+    // 7000000 characters with 3200 rows will exceed 2GB constraint for one column page.
+    content = createFile2(inputFile_2g_column_page, line = 3200, varcharLen = 700000)
+
+    sql(
+      s"""
+         | CREATE TABLE if not exists $longStringTable(
+         | id INT, name STRING, description STRING, address STRING
+         | ) STORED BY 'carbondata'
+         | TBLPROPERTIES('LONG_STRING_COLUMNS'='description', 'SORT_COLUMNS'='name')
+         |""".stripMargin)
+    val exceptionCaught = intercept[Exception] {
+      sql(
+        s"""
+           | LOAD DATA LOCAL INPATH '$inputFile_2g_column_page' INTO TABLE $longStringTable
+           | OPTIONS('header'='false')
+       """.stripMargin)
+    }
+    // since after exception wrapper, we cannot get the root cause directly
+  }
+
+  // will create 2 long string columns
+  private def createFile(filePath: String, line: Int = 10000, start: Int = 0,
+      varcharLen: Int = Short.MaxValue + 1000): Content = {
+    val head = 0
+    val mid = line / 2
+    var tail = line - 1
+    var desc_line_head: String = ""
+    var desc_line_mid: String = ""
+    var desc_line_tail: String = ""
+    var note_line_head: String = ""
+    var note_line_mid: String = ""
+    var note_line_tail: String = ""
+    if (new File(filePath).exists()) {
+      deleteFile(filePath)
+    }
+    val write = new PrintWriter(new File(filePath))
+    for (i <- start until (start + line)) {
+      val description = RandomStringUtils.randomAlphabetic(varcharLen)
+      val note = RandomStringUtils.randomAlphabetic(varcharLen)
+      val line = s"$i,name_$i,$description,address_$i,$note"
+      if (head == i) {
+        desc_line_head = description
+        note_line_head = note
+      } else if (mid == i) {
+        desc_line_mid = description
+        note_line_mid = note
+      } else if (tail == i) {
+        desc_line_tail = description
+        note_line_tail = note
+      }
+      write.println(line)
+    }
+    write.close()
+    Content(head, desc_line_head, note_line_head,
+      mid, desc_line_mid, note_line_mid, tail,
+      desc_line_tail, note_line_tail)
+  }
+
+  // will only create 1 long string column
+  private def createFile2(filePath: String, line: Int = 10000, start: Int = 0,
+      varcharLen: Int = Short.MaxValue + 1000): Content = {
+    val head = 0
+    val mid = line / 2
+    var tail = line - 1
+    var desc_line_head: String = ""
+    var desc_line_mid: String = ""
+    var desc_line_tail: String = ""
+    if (new File(filePath).exists()) {
+      deleteFile(filePath)
+    }
+    val write = new PrintWriter(new File(filePath))
+    for (i <- start until (start + line)) {
+      val description = RandomStringUtils.randomAlphabetic(varcharLen)
+      val note = RandomStringUtils.randomAlphabetic(varcharLen)
+      val line = s"$i,name_$i,$description,address_$i"
+      if (head == i) {
+        desc_line_head = description
+      } else if (mid == i) {
+        desc_line_mid = description
+      } else if (tail == i) {
+        desc_line_tail = description
+      }
+      write.println(line)
+    }
+    write.close()
+    Content(head, desc_line_head, "",
+      mid, desc_line_mid, "", tail,
+      desc_line_tail, "")
+  }
+
+  private def deleteFile(filePath: String): Unit = {
+    val file = new File(filePath)
+    if (file.exists()) {
+      file.delete()
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 1ccbf6a..6227655 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -115,6 +115,7 @@ object CarbonScalaUtil {
         case CarbonDataTypes.BOOLEAN => BooleanType
         case CarbonDataTypes.TIMESTAMP => TimestampType
         case CarbonDataTypes.DATE => DateType
+        case CarbonDataTypes.VARCHAR => StringType
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 6673e18..6cd28c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -126,6 +126,7 @@ object DataTypeConverterUtil {
       case "timestamp" => ThriftDataType.TIMESTAMP
       case "array" => ThriftDataType.ARRAY
       case "struct" => ThriftDataType.STRUCT
+      case "varchar" => ThriftDataType.VARCHAR
       case _ => ThriftDataType.STRING
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9af8817..0d53a73 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -280,7 +280,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     fields.zipWithIndex.foreach { case (field, index) =>
       field.schemaOrdinal = index
     }
-    val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
+    val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
       fields, tableProperties)
 
     // column properties
@@ -391,6 +391,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
       Option(sortKeyDims),
+      Option(varcharColumns),
       Option(noDictionaryDims),
       Option(noInvertedIdxCols),
       groupCols,
@@ -691,12 +692,31 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
    * @return
    */
   protected def extractDimAndMsrFields(fields: Seq[Field],
-      tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
+      tableProperties: Map[String, String]):
+  (Seq[Field], Seq[Field], Seq[String], Seq[String], Seq[String]) = {
     var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
     var msrFields: Seq[Field] = Seq[Field]()
     var dictExcludeCols: Array[String] = Array[String]()
     var noDictionaryDims: Seq[String] = Seq[String]()
     var dictIncludeCols: Seq[String] = Seq[String]()
+    var varcharCols: Seq[String] = Seq[String]()
+
+    // All long_string cols should be there in create table cols and should be of string data type
+    if (tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS).isDefined) {
+      varcharCols =
+        tableProperties(CarbonCommonConstants.LONG_STRING_COLUMNS).split(",").map(_.trim)
+      varcharCols.foreach { varcharCol =>
+        val exists = fields.exists(f => f.column.equalsIgnoreCase(varcharCol) &&
+                                        DataTypes.STRING.getName.equalsIgnoreCase(f.dataType.get))
+        if (!exists) {
+          throw new MalformedCarbonCommandException(
+            s"""
+               |${CarbonCommonConstants.LONG_STRING_COLUMNS}: $varcharCol does not exist in table
+               | or its data type is not string. Please check create table statement.
+             """.stripMargin)
+        }
+      }
+    }
 
     // All columns in sortkey should be there in create table cols
     val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
@@ -727,6 +747,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
             val errormsg = s"sort_columns is unsupported for $dataType datatype column: " + column
             throw new MalformedCarbonCommandException(errormsg)
           }
+          if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
+            throw new MalformedCarbonCommandException(
+              s"sort_columns is unsupported for long string datatype column $column")
+          }
         }
       }
 
@@ -824,9 +848,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     var sortKeyDims = sortKeyDimsTmp
     if (sortKeyOption.isEmpty) {
-      // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+      // if SORT_COLUMNS was not defined,
+      // add all dimension(except long string columns) to SORT_COLUMNS.
       dimFields.foreach { field =>
-        if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+        if (!isComplexDimDictionaryExclude(field.dataType.get) &&
+            !varcharCols.contains(field.column)) {
           sortKeyDims :+= field.column
         }
       }
@@ -837,7 +863,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
     } else {
       tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
     }
-    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
+    (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims, varcharCols)
   }
 
   def isDefaultMeasure(dataType: Option[String]): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index d48db21..c77d0df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -54,6 +54,7 @@ case class TableModel(
     dimCols: Seq[Field],
     msrCols: Seq[Field],
     sortKeyDims: Option[Seq[String]],
+    varcharCols: Option[Seq[String]],
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
@@ -212,9 +213,9 @@ class AlterTableColumnSchemaGenerator(
     tableIdentifier: AbsoluteTableIdentifier,
     sc: SparkContext) {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
 
-  def isSortColumn(columnName: String): Boolean = {
+  private def isSortColumn(columnName: String): Boolean = {
     val sortColumns = alterTableModel.tableProperties.get("sort_columns")
     if(sortColumns.isDefined) {
       sortColumns.get.contains(columnName)
@@ -222,6 +223,16 @@ class AlterTableColumnSchemaGenerator(
       true
     }
   }
+
+  private def isVarcharColumn(columnName: String): Boolean = {
+    val varcharColumns = alterTableModel.tableProperties.get("long_string_columns")
+    if (varcharColumns.isDefined) {
+      varcharColumns.get.contains(columnName)
+    } else {
+      false
+    }
+  }
+
   def process: Seq[ColumnSchema] = {
     val tableSchema = tableInfo.getFactTable
     val tableCols = tableSchema.getListOfColumns.asScala
@@ -241,7 +252,8 @@ class AlterTableColumnSchemaGenerator(
         field.schemaOrdinal + existingColsSize,
         alterTableModel.highCardinalityDims,
         alterTableModel.databaseName.getOrElse(dbName),
-        isSortColumn(field.name.getOrElse(field.column)))
+        isSortColumn(field.name.getOrElse(field.column)),
+        isVarcharColumn(field.name.getOrElse(field.column)))
       allColumns ++= Seq(columnSchema)
       newCols ++= Seq(columnSchema)
     })
@@ -351,14 +363,19 @@ object TableNewProcessor {
       schemaOrdinal: Int,
       highCardinalityDims: Seq[String],
       databaseName: String,
-      isSortColumn: Boolean = false): ColumnSchema = {
+      isSortColumn: Boolean = false,
+      isVarcharColumn: Boolean = false): ColumnSchema = {
     val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
     if (DataTypes.isDecimal(dataType)) {
       dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
       dataType.asInstanceOf[DecimalType].setScale(field.scale)
     }
     val columnSchema = new ColumnSchema()
-    columnSchema.setDataType(dataType)
+    if (isVarcharColumn) {
+      columnSchema.setDataType(DataTypes.VARCHAR)
+    } else {
+      columnSchema.setDataType(dataType)
+    }
     val colName = field.name.getOrElse(field.column)
     columnSchema.setColumnName(colName)
     if (highCardinalityDims.contains(colName)) {
@@ -415,6 +432,11 @@ class TableNewProcessor(cm: TableModel) {
     allColumns
   }
 
+  // varchar column is a string column that in long_string_columns
+  private def isVarcharColumn(colName : String): Boolean = {
+    cm.varcharCols.get.contains(colName)
+  }
+
   def getColumnSchema(
       dataType: DataType,
       colName: String,
@@ -450,6 +472,9 @@ class TableNewProcessor(cm: TableModel) {
     columnSchema.setScale(field.scale)
     columnSchema.setSchemaOrdinal(field.schemaOrdinal)
     columnSchema.setSortColumn(false)
+    if (isVarcharColumn(colName)) {
+      columnSchema.setDataType(DataTypes.VARCHAR)
+    }
     if(isParentColumnRelation) {
       val dataMapField = map.get.get(field).get
       columnSchema.setFunction(dataMapField.aggregateFunction)
@@ -517,7 +542,7 @@ class TableNewProcessor(cm: TableModel) {
     val dictionaryIncludeCols = cm.tableProperties
       .getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
 
-    cm.dimCols.foreach { field =>
+    def addDimensionCol(field: Field): Unit = {
       val sortField = cm.sortKeyDims.get.find(field.column equals _)
       if (sortField.isEmpty) {
         val encoders = if (getEncoderFromParent(field)) {
@@ -549,6 +574,12 @@ class TableNewProcessor(cm: TableModel) {
         }
       }
     }
+    // dimensions that are not varchar
+    cm.dimCols.filter(field => !cm.varcharCols.get.contains(field.column))
+      .foreach(addDimensionCol(_))
+    // dimensions that are varchar
+    cm.dimCols.filter(field => cm.varcharCols.get.contains(field.column))
+      .foreach(addDimensionCol(_))
 
     // check whether the column is a local dictionary column and set in column schema
     if (null != cm.tableProperties) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 5739d3e..2f2048d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -201,6 +201,7 @@ case class CarbonRelation(
 object CarbonMetastoreTypes extends RegexParsers {
   protected lazy val primitiveType: Parser[DataType] =
     "string" ^^^ StringType |
+    "varchar" ^^^ StringType |
     "float" ^^^ FloatType |
     "int" ^^^ IntegerType |
     "tinyint" ^^^ ShortType |

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 9cf7fe4..3018e49 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -73,8 +73,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            throw new CarbonDataLoadingException(String.format(
+                "Dataload failed, String size cannot exceed %d bytes,"
+                    + " please consider long string data type",
+                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
           row.update(value, index);
         } else {
@@ -82,8 +84,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
               .getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
           if (dataType == DataTypes.STRING
               && value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
-            throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
-                + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+            throw new CarbonDataLoadingException(String.format(
+                "Dataload failed, String size cannot exceed %d bytes,"
+                    + " please consider long string data type",
+                CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
           }
           row.update(value, index);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index 2e3479c..86c71a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -205,7 +205,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
     parserSettings.setSkipEmptyLines(
         Boolean.valueOf(job.get(SKIP_EMPTY_LINE,
             CarbonCommonConstants.CARBON_SKIP_EMPTY_LINE_DEFAULT)));
-    parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+    // todo: will verify whether there is a performance degrade using -1 here
+    // parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+    parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_INFINITY);
     String maxColumns = job.get(MAX_COLUMNS, "" + DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
     parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
     parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 8d351cf..8bec099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -54,12 +54,13 @@ public class IntermediateSortTempRow {
   /**
    * deserialize from bytes array to get the no sort fields
    * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
+   * @param outNoDictNoSortAndVarcharDims stores the no-dict & no-sort fields,
+ *                                    including complex and varchar fields
    * @param outMeasures stores the measure fields
    * @param dataTypes data type for the measure
    */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
-      Object[] outMeasures, DataType[] dataTypes) {
+  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSortAndVarcharDims,
+      Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt) {
     ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
     // read dict_no_sort
     int dictNoSortCnt = outDictNoSort.length;
@@ -68,12 +69,20 @@ public class IntermediateSortTempRow {
     }
 
     // read no_dict_no_sort (including complex)
-    int noDictNoSortCnt = outNoDictNoSort.length;
+    int noDictNoSortCnt = outNoDictNoSortAndVarcharDims.length - varcharDimCnt;
     for (int i = 0; i < noDictNoSortCnt; i++) {
       short len = rowBuffer.getShort();
       byte[] bytes = new byte[len];
       rowBuffer.get(bytes);
-      outNoDictNoSort[i] = bytes;
+      outNoDictNoSortAndVarcharDims[i] = bytes;
+    }
+
+    // read varchar dims
+    for (int i = 0; i < varcharDimCnt; i++) {
+      int len = rowBuffer.getInt();
+      byte[] bytes = new byte[len];
+      rowBuffer.get(bytes);
+      outNoDictNoSortAndVarcharDims[i + noDictNoSortCnt] = bytes;
     }
 
     // read measure

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index f31a2b9..bcf8a39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -46,6 +46,7 @@ public class SortStepRowHandler implements Serializable {
   private int dictNoSortDimCnt = 0;
   private int noDictSortDimCnt = 0;
   private int noDictNoSortDimCnt = 0;
+  private int varcharDimCnt = 0;
   private int measureCnt;
 
   // indices for dict & sort dimension columns
@@ -56,6 +57,7 @@ public class SortStepRowHandler implements Serializable {
   private int[] noDictSortDimIdx;
   // indices for no-dict & no-sort dimension columns, including complex columns
   private int[] noDictNoSortDimIdx;
+  private int[] varcharDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
@@ -70,11 +72,13 @@ public class SortStepRowHandler implements Serializable {
     this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
     this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
     this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
+    this.varcharDimCnt = tableFieldStat.getVarcharDimCnt();
     this.measureCnt = tableFieldStat.getMeasureCnt();
     this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
     this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
     this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
     this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
+    this.varcharDimIdx = tableFieldStat.getVarcharDimIdx();
     this.measureIdx = tableFieldStat.getMeasureIdx();
     this.dataTypes = tableFieldStat.getMeasureDataType();
   }
@@ -122,6 +126,10 @@ public class SortStepRowHandler implements Serializable {
       for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
         nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
       }
+      // convert varchar dims
+      for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+        nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+      }
 
       // convert measure data
       for (int idx = 0; idx < this.measureCnt; idx++) {
@@ -146,13 +154,15 @@ public class SortStepRowHandler implements Serializable {
     int[] dictDims
         = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
     byte[][] noDictArray
-        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
+        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][];
 
     int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
+    byte[][] noDictNoSortAndVarcharDims
+        = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt][];
     Object[] measures = new Object[this.measureCnt];
 
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
+    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharDims, measures,
+        this.dataTypes, this.varcharDimCnt);
 
     // dict dims
     System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
@@ -163,8 +173,8 @@ public class SortStepRowHandler implements Serializable {
     // no dict dims, including complex
     System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
         noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt);
+    System.arraycopy(noDictNoSortAndVarcharDims, 0, noDictArray,
+        this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt);
 
     // measures are already here
 
@@ -428,6 +438,12 @@ public class SortStepRowHandler implements Serializable {
       rowBuffer.putShort((short) bytes.length);
       rowBuffer.put(bytes);
     }
+    // convert varchar dims
+    for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+      byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
+      rowBuffer.putInt(bytes.length);
+      rowBuffer.put(bytes);
+    }
 
     // convert measure
     Object tmpValue;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index dde18a9..9dab181 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -92,6 +92,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private boolean[] noDictionaryColMapping;
   /**
+   * boolean mapping for long string dimension
+   */
+  private boolean[] isVarcharDimMapping;
+  /**
    * agg type defined for measures
    */
   private DataType[] dataTypes;
@@ -353,13 +357,18 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     measureCount = carbonTable.getMeasureByTableName(tableName).size();
     List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
     noDictionaryColMapping = new boolean[dimensions.size()];
+    isVarcharDimMapping = new boolean[dimensions.size()];
     int i = 0;
+    int j = 0;
     for (CarbonDimension dimension : dimensions) {
       if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
         i++;
         continue;
       }
       noDictionaryColMapping[i++] = true;
+      if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
+        isVarcharDimMapping[j++] = true;
+      }
       noDictionaryCount++;
     }
     dimensionColumnCount = dimensions.size();
@@ -387,7 +396,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
         .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
             dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
             noDictionaryCount, segmentId,
-            carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
+            carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 705350c..502fa05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -111,7 +111,11 @@ public class SortParameters implements Serializable {
   private boolean[] noDictionaryDimnesionColumn;
 
   private boolean[] noDictionarySortColumn;
-
+  /**
+   * whether dimension is varchar data type.
+   * since all dimensions are string, we use an array of boolean instead of datatypes
+   */
+  private boolean[] isVarcharDimensionColumn;
   private int numberOfSortColumns;
 
   private int numberOfNoDictSortColumns;
@@ -143,6 +147,7 @@ public class SortParameters implements Serializable {
     parameters.segmentId = segmentId;
     parameters.taskNo = taskNo;
     parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+    parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
     parameters.noDictionarySortColumn = noDictionarySortColumn;
     parameters.numberOfSortColumns = numberOfSortColumns;
     parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
@@ -312,6 +317,14 @@ public class SortParameters implements Serializable {
     this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
   }
 
+  public boolean[] getIsVarcharDimensionColumn() {
+    return isVarcharDimensionColumn;
+  }
+
+  public void setIsVarcharDimensionColumn(boolean[] isVarcharDimensionColumn) {
+    this.isVarcharDimensionColumn = isVarcharDimensionColumn;
+  }
+
   public int getNumberOfCores() {
     return numberOfCores;
   }
@@ -371,6 +384,8 @@ public class SortParameters implements Serializable {
         .getComplexNonDictionaryColumnCount());
     parameters.setNoDictionaryDimnesionColumn(
         CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+    parameters.setIsVarcharDimensionColumn(
+        CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
     parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
 
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
@@ -461,7 +476,8 @@ public class SortParameters implements Serializable {
   public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
       String tableName, int dimColCount, int complexDimColCount, int measureColCount,
       int noDictionaryCount, String segmentId, String taskNo,
-      boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
+      boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn,
+      boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
     parameters.setDatabaseName(databaseName);
@@ -476,6 +492,7 @@ public class SortParameters implements Serializable {
     parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
     parameters.setComplexDimColCount(complexDimColCount);
     parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+    parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
     parameters.setObserver(new SortObserver());
     // get sort buffer size
     parameters.setSortBufferSize(Integer.parseInt(carbonProperties

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 0d1303a..094bd83 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -33,9 +33,13 @@ public class TableFieldStat implements Serializable {
   private int dictSortDimCnt = 0;
   private int dictNoSortDimCnt = 0;
   private int noDictSortDimCnt = 0;
+  // for columns that are no_dict_dim and no_sort_dim and complex, except the varchar dims
   private int noDictNoSortDimCnt = 0;
+  // for columns that are varchar data type
+  private int varcharDimCnt = 0;
   // whether sort column is of dictionary type or not
   private boolean[] isSortColNoDictFlags;
+  private boolean[] isVarcharDimFlags;
   private int measureCnt;
   private DataType[] measureDataType;
 
@@ -47,6 +51,8 @@ public class TableFieldStat implements Serializable {
   private int[] noDictSortDimIdx;
   // indices for no-dict & no-sort dimension columns, including complex columns
   private int[] noDictNoSortDimIdx;
+  // indices for varchar dimension columns
+  private int[] varcharDimIdx;
   // indices for measure columns
   private int[] measureIdx;
 
@@ -55,6 +61,7 @@ public class TableFieldStat implements Serializable {
     int complexDimCnt = sortParameters.getComplexDimColCount();
     int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
     this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
+    this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
     int sortColCnt = isSortColNoDictFlags.length;
     for (boolean flag : isSortColNoDictFlags) {
       if (flag) {
@@ -66,22 +73,33 @@ public class TableFieldStat implements Serializable {
     this.measureCnt = sortParameters.getMeasureColCount();
     this.measureDataType = sortParameters.getMeasureDataType();
 
+    for (boolean flag : isVarcharDimFlags) {
+      if (flag) {
+        varcharDimCnt++;
+      }
+    }
+
     // be careful that the default value is 0
     this.dictSortDimIdx = new int[dictSortDimCnt];
     this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
     this.noDictSortDimIdx = new int[noDictSortDimCnt];
-    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
+    this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt
+        - varcharDimCnt];
+    this.varcharDimIdx = new int[varcharDimCnt];
     this.measureIdx = new int[measureCnt];
 
     int tmpNoDictSortCnt = 0;
     int tmpNoDictNoSortCnt = 0;
     int tmpDictSortCnt = 0;
     int tmpDictNoSortCnt = 0;
+    int tmpVarcharCnt = 0;
     boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
 
     for (int i = 0; i < isDimNoDictFlags.length; i++) {
       if (isDimNoDictFlags[i]) {
-        if (i < sortColCnt && isSortColNoDictFlags[i]) {
+        if (isVarcharDimFlags[i]) {
+          varcharDimIdx[tmpVarcharCnt++] = i;
+        } else if (i < sortColCnt && isSortColNoDictFlags[i]) {
           noDictSortDimIdx[tmpNoDictSortCnt++] = i;
         } else {
           noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
@@ -126,10 +144,18 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimCnt;
   }
 
+  public int getVarcharDimCnt() {
+    return varcharDimCnt;
+  }
+
   public boolean[] getIsSortColNoDictFlags() {
     return isSortColNoDictFlags;
   }
 
+  public boolean[] getIsVarcharDimFlags() {
+    return isVarcharDimFlags;
+  }
+
   public int getMeasureCnt() {
     return measureCnt;
   }
@@ -154,6 +180,10 @@ public class TableFieldStat implements Serializable {
     return noDictNoSortDimIdx;
   }
 
+  public int[] getVarcharDimIdx() {
+    return varcharDimIdx;
+  }
+
   public int[] getMeasureIdx() {
     return measureIdx;
   }
@@ -166,11 +196,12 @@ public class TableFieldStat implements Serializable {
         && dictNoSortDimCnt == that.dictNoSortDimCnt
         && noDictSortDimCnt == that.noDictSortDimCnt
         && noDictNoSortDimCnt == that.noDictNoSortDimCnt
+        && varcharDimCnt == that.varcharDimCnt
         && measureCnt == that.measureCnt;
   }
 
   @Override public int hashCode() {
     return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
-        noDictNoSortDimCnt, measureCnt);
+        noDictNoSortDimCnt, varcharDimCnt, measureCnt);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 5408193..1a1c5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
 import org.apache.carbondata.core.datastore.page.key.TablePageKey;
 import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
-import org.apache.carbondata.core.datastore.page.statistics.LVStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVLongStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector;
 import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -98,8 +99,16 @@ public class TablePage {
     noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
     for (int i = 0; i < noDictDimensionPages.length; i++) {
       TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension);
-      ColumnPage page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
-      page.setStatsCollector(LVStringStatsCollector.newInstance());
+      ColumnPage page;
+      if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
+        page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize);
+        page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+      } else {
+        // In previous implementation, other data types such as string, date and timestamp
+        // will be encoded using string page
+        page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
+        page.setStatsCollector(LVShortStringStatsCollector.newInstance());
+      }
       noDictDimensionPages[i] = page;
     }
     complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()];
@@ -155,16 +164,21 @@ public class TablePage {
       dictDimensionPages[i].putData(rowId, keys[i]);
     }
 
-    // 2. convert noDictionary columns and complex columns.
+    // 2. convert noDictionary columns and complex columns and varchar columns.
     int noDictionaryCount = noDictDimensionPages.length;
     int complexColumnCount = complexDimensionPages.length;
     if (noDictionaryCount > 0 || complexColumnCount > 0) {
+      TableSpec tableSpec = model.getTableSpec();
       byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
       for (int i = 0; i < noDictAndComplex.length; i++) {
-        if (i < noDictionaryCount) {
+        if (tableSpec.getDimensionSpec(dictDimensionPages.length + i).getSchemaDataType()
+            == DataTypes.VARCHAR) {
+          byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]);
+          noDictDimensionPages[i].putData(rowId, valueWithLength);
+        } else if (i < noDictionaryCount) {
           // noDictionary columns, since it is variable length, we need to prepare each
           // element as LV result byte array (first two bytes are the length of the array)
-          byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+          byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]);
           noDictDimensionPages[i].putData(rowId, valueWithLength);
         } else {
           // complex columns
@@ -250,7 +264,7 @@ public class TablePage {
   }
 
   // Adds length as a short element (first 2 bytes) to the head of the input byte array
-  private byte[] addLengthToByteArray(byte[] input) {
+  private byte[] addShortLengthToByteArray(byte[] input) {
     if (input.length > Short.MAX_VALUE) {
       throw new RuntimeException("input data length " + input.length +
           " bytes too long, maximum length supported is " + Short.MAX_VALUE + " bytes");
@@ -262,6 +276,15 @@ public class TablePage {
     return output;
   }
 
+  // Adds length as a integer element (first 4 bytes) to the head of the input byte array
+  private byte[] addIntLengthToByteArray(byte[] input) {
+    byte[] output = new byte[input.length + 4];
+    ByteBuffer buffer = ByteBuffer.wrap(output);
+    buffer.putInt(input.length);
+    buffer.put(input, 0, input.length);
+    return output;
+  }
+
   void encode() throws KeyGenException, MemoryException, IOException {
     // encode dimensions and measure
     EncodedColumnPage[] dimensions = encodeAndCompressDimensions();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f921fd5..12c95a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -223,6 +223,26 @@ public final class CarbonDataProcessorUtil {
         .toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
   }
 
+  /**
+   * Preparing the boolean [] to map whether the dimension is varchar data type or not.
+   */
+  public static boolean[] getIsVarcharColumnMapping(DataField[] fields) {
+    List<Boolean> isVarcharColumnMapping = new ArrayList<Boolean>();
+    for (DataField field : fields) {
+      // for complex type need to break the loop
+      if (field.getColumn().isComplex()) {
+        break;
+      }
+
+      if (field.getColumn().isDimension()) {
+        isVarcharColumnMapping.add(
+            field.getColumn().getColumnSchema().getDataType() == DataTypes.VARCHAR);
+      }
+    }
+    return ArrayUtils.toPrimitive(
+        isVarcharColumnMapping.toArray(new Boolean[isVarcharColumnMapping.size()]));
+  }
+
   public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) {
     List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
     for (CarbonColumn column : carbonColumns) {