You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/06/22 01:34:41 UTC
[40/50] [abbrv] carbondata git commit: [CARBONDATA-2420][32K] Support
string longer than 32000 characters
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
new file mode 100644
index 0000000..419b306
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -0,0 +1,279 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.carbondata.spark.testsuite.longstring
+
+import java.io.{File, PrintWriter}
+
+import org.apache.commons.lang3.RandomStringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+ private val longStringTable = "long_string_table"
+ private val inputDir = s"$resourcesPath${File.separator}varchartype${File.separator}"
+ private val fileName = s"longStringData.csv"
+ private val inputFile = s"$inputDir$fileName"
+ private val fileName_2g_column_page = s"longStringData_exceed_2gb_column_page.csv"
+ private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page"
+ private val lineNum = 1000
+ private var content: Content = _
+ private var originMemorySize = CarbonProperties.getInstance().getProperty(
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+
+ case class Content(head: Int, desc_line_head: String, note_line_head: String,
+ mid: Int, desc_line_mid: String, note_line_mid: String,
+ tail: Int, desc_line_tail: String, note_line_tail: String)
+
+ override def beforeAll(): Unit = {
+ // for one 32000 lines * 32000 characters column page, it use about 1GB memory, but here we have only 1000 lines
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
+ deleteFile(inputFile)
+ if (!new File(inputDir).exists()) {
+ new File(inputDir).mkdir()
+ }
+ content = createFile(inputFile, line = lineNum)
+ }
+
+ override def afterAll(): Unit = {
+ CarbonProperties.getInstance().addProperty(
+ CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, originMemorySize)
+ deleteFile(inputFile)
+ deleteFile(inputFile_2g_column_page)
+ if (new File(inputDir).exists()) {
+ new File(inputDir).delete()
+ }
+ }
+
+ override def beforeEach(): Unit = {
+ sql(s"drop table if exists $longStringTable")
+ }
+
+ override def afterEach(): Unit = {
+ sql(s"drop table if exists $longStringTable")
+ }
+
+ private def prepareTable(): Unit = {
+ sql(
+ s"""
+ | CREATE TABLE if not exists $longStringTable(
+ | id INT, name STRING, description STRING, address STRING, note STRING
+ | ) STORED BY 'carbondata'
+ | TBLPROPERTIES('LONG_STRING_COLUMNS'='description, note', 'SORT_COLUMNS'='name')
+ |""".stripMargin)
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputFile' INTO TABLE $longStringTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ }
+
+ private def checkQuery(): Unit = {
+ // query without long_string_column
+ checkAnswer(sql(s"SELECT id, name, address FROM $longStringTable where id = ${content.tail}"),
+ Row(content.tail, s"name_${content.tail}", s"address_${content.tail}"))
+ // query return long_string_column in the middle position
+ checkAnswer(sql(s"SELECT id, name, description, address FROM $longStringTable where id = ${content.head}"),
+ Row(content.head, s"name_${content.head}", content.desc_line_head, s"address_${content.head}"))
+ // query return long_string_column at last position
+ checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where id = ${content.mid}"),
+ Row(content.mid, s"name_${content.mid}", s"address_${content.mid}", content.desc_line_mid))
+ // query return 2 long_string_columns
+ checkAnswer(sql(s"SELECT id, name, note, address, description FROM $longStringTable where id = ${content.mid}"),
+ Row(content.mid, s"name_${content.mid}", content.note_line_mid, s"address_${content.mid}", content.desc_line_mid))
+ // query by simple string column
+ checkAnswer(sql(s"SELECT id, note, address, description FROM $longStringTable where name = 'name_${content.tail}'"),
+ Row(content.tail, content.note_line_tail, s"address_${content.tail}", content.desc_line_tail))
+ // query by long string column
+ checkAnswer(sql(s"SELECT id, name, address, description FROM $longStringTable where note = '${content.note_line_tail}'"),
+ Row(content.tail, s"name_${content.tail}", s"address_${content.tail}", content.desc_line_tail))
+ }
+
+ test("Load and query with long string datatype: safe sort & safe columnpage") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+ prepareTable()
+ checkQuery()
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+ CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+ }
+
+ test("Load and query with long string datatype: safe sort & unsafe column page") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "false")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+ prepareTable()
+ checkQuery()
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+ CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+ }
+
+ test("Load and query with long string datatype: unsafe sort & safe column page") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "false")
+
+ prepareTable()
+ checkQuery()
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+ CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+ }
+
+ test("Load and query with long string datatype: unsafe sort & unsafe column page") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT, "true")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE, "true")
+
+ prepareTable()
+ checkQuery()
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
+ CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_OFFHEAP_SORT,
+ CarbonCommonConstants.ENABLE_OFFHEAP_SORT_DEFAULT)
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE,
+ CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_DEFAULT)
+ }
+
+ // ignore this test in CI, because it will need at least 4GB memory to run successfully
+ ignore("Exceed 2GB per column page for varchar datatype") {
+ deleteFile(inputFile_2g_column_page)
+ if (!new File(inputDir).exists()) {
+ new File(inputDir).mkdir()
+ }
+ // 7000000 characters with 3200 rows will exceed 2GB constraint for one column page.
+ content = createFile2(inputFile_2g_column_page, line = 3200, varcharLen = 700000)
+
+ sql(
+ s"""
+ | CREATE TABLE if not exists $longStringTable(
+ | id INT, name STRING, description STRING, address STRING
+ | ) STORED BY 'carbondata'
+ | TBLPROPERTIES('LONG_STRING_COLUMNS'='description', 'SORT_COLUMNS'='name')
+ |""".stripMargin)
+ val exceptionCaught = intercept[Exception] {
+ sql(
+ s"""
+ | LOAD DATA LOCAL INPATH '$inputFile_2g_column_page' INTO TABLE $longStringTable
+ | OPTIONS('header'='false')
+ """.stripMargin)
+ }
+ // since after exception wrapper, we cannot get the root cause directly
+ }
+
+ // will create 2 long string columns
+ private def createFile(filePath: String, line: Int = 10000, start: Int = 0,
+ varcharLen: Int = Short.MaxValue + 1000): Content = {
+ val head = 0
+ val mid = line / 2
+ var tail = line - 1
+ var desc_line_head: String = ""
+ var desc_line_mid: String = ""
+ var desc_line_tail: String = ""
+ var note_line_head: String = ""
+ var note_line_mid: String = ""
+ var note_line_tail: String = ""
+ if (new File(filePath).exists()) {
+ deleteFile(filePath)
+ }
+ val write = new PrintWriter(new File(filePath))
+ for (i <- start until (start + line)) {
+ val description = RandomStringUtils.randomAlphabetic(varcharLen)
+ val note = RandomStringUtils.randomAlphabetic(varcharLen)
+ val line = s"$i,name_$i,$description,address_$i,$note"
+ if (head == i) {
+ desc_line_head = description
+ note_line_head = note
+ } else if (mid == i) {
+ desc_line_mid = description
+ note_line_mid = note
+ } else if (tail == i) {
+ desc_line_tail = description
+ note_line_tail = note
+ }
+ write.println(line)
+ }
+ write.close()
+ Content(head, desc_line_head, note_line_head,
+ mid, desc_line_mid, note_line_mid, tail,
+ desc_line_tail, note_line_tail)
+ }
+
+ // will only create 1 long string column
+ private def createFile2(filePath: String, line: Int = 10000, start: Int = 0,
+ varcharLen: Int = Short.MaxValue + 1000): Content = {
+ val head = 0
+ val mid = line / 2
+ var tail = line - 1
+ var desc_line_head: String = ""
+ var desc_line_mid: String = ""
+ var desc_line_tail: String = ""
+ if (new File(filePath).exists()) {
+ deleteFile(filePath)
+ }
+ val write = new PrintWriter(new File(filePath))
+ for (i <- start until (start + line)) {
+ val description = RandomStringUtils.randomAlphabetic(varcharLen)
+ val note = RandomStringUtils.randomAlphabetic(varcharLen)
+ val line = s"$i,name_$i,$description,address_$i"
+ if (head == i) {
+ desc_line_head = description
+ } else if (mid == i) {
+ desc_line_mid = description
+ } else if (tail == i) {
+ desc_line_tail = description
+ }
+ write.println(line)
+ }
+ write.close()
+ Content(head, desc_line_head, "",
+ mid, desc_line_mid, "", tail,
+ desc_line_tail, "")
+ }
+
+ private def deleteFile(filePath: String): Unit = {
+ val file = new File(filePath)
+ if (file.exists()) {
+ file.delete()
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 1ccbf6a..6227655 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -115,6 +115,7 @@ object CarbonScalaUtil {
case CarbonDataTypes.BOOLEAN => BooleanType
case CarbonDataTypes.TIMESTAMP => TimestampType
case CarbonDataTypes.DATE => DateType
+ case CarbonDataTypes.VARCHAR => StringType
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
index 6673e18..6cd28c0 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataTypeConverterUtil.scala
@@ -126,6 +126,7 @@ object DataTypeConverterUtil {
case "timestamp" => ThriftDataType.TIMESTAMP
case "array" => ThriftDataType.ARRAY
case "struct" => ThriftDataType.STRUCT
+ case "varchar" => ThriftDataType.VARCHAR
case _ => ThriftDataType.STRING
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9af8817..0d53a73 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -280,7 +280,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
fields.zipWithIndex.foreach { case (field, index) =>
field.schemaOrdinal = index
}
- val (dims, msrs, noDictionaryDims, sortKeyDims) = extractDimAndMsrFields(
+ val (dims, msrs, noDictionaryDims, sortKeyDims, varcharColumns) = extractDimAndMsrFields(
fields, tableProperties)
// column properties
@@ -391,6 +391,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
msrs.map(f => normalizeType(f)),
Option(sortKeyDims),
+ Option(varcharColumns),
Option(noDictionaryDims),
Option(noInvertedIdxCols),
groupCols,
@@ -691,12 +692,31 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
* @return
*/
protected def extractDimAndMsrFields(fields: Seq[Field],
- tableProperties: Map[String, String]): (Seq[Field], Seq[Field], Seq[String], Seq[String]) = {
+ tableProperties: Map[String, String]):
+ (Seq[Field], Seq[Field], Seq[String], Seq[String], Seq[String]) = {
var dimFields: LinkedHashSet[Field] = LinkedHashSet[Field]()
var msrFields: Seq[Field] = Seq[Field]()
var dictExcludeCols: Array[String] = Array[String]()
var noDictionaryDims: Seq[String] = Seq[String]()
var dictIncludeCols: Seq[String] = Seq[String]()
+ var varcharCols: Seq[String] = Seq[String]()
+
+ // All long_string cols should be there in create table cols and should be of string data type
+ if (tableProperties.get(CarbonCommonConstants.LONG_STRING_COLUMNS).isDefined) {
+ varcharCols =
+ tableProperties(CarbonCommonConstants.LONG_STRING_COLUMNS).split(",").map(_.trim)
+ varcharCols.foreach { varcharCol =>
+ val exists = fields.exists(f => f.column.equalsIgnoreCase(varcharCol) &&
+ DataTypes.STRING.getName.equalsIgnoreCase(f.dataType.get))
+ if (!exists) {
+ throw new MalformedCarbonCommandException(
+ s"""
+ |${CarbonCommonConstants.LONG_STRING_COLUMNS}: $varcharCol does not exist in table
+ | or its data type is not string. Please check create table statement.
+ """.stripMargin)
+ }
+ }
+ }
// All columns in sortkey should be there in create table cols
val sortKeyOption = tableProperties.get(CarbonCommonConstants.SORT_COLUMNS)
@@ -727,6 +747,10 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
val errormsg = s"sort_columns is unsupported for $dataType datatype column: " + column
throw new MalformedCarbonCommandException(errormsg)
}
+ if (varcharCols.exists(x => x.equalsIgnoreCase(column))) {
+ throw new MalformedCarbonCommandException(
+ s"sort_columns is unsupported for long string datatype column $column")
+ }
}
}
@@ -824,9 +848,11 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
var sortKeyDims = sortKeyDimsTmp
if (sortKeyOption.isEmpty) {
- // if SORT_COLUMNS was not defined, add all dimension to SORT_COLUMNS.
+ // if SORT_COLUMNS was not defined,
+ // add all dimension(except long string columns) to SORT_COLUMNS.
dimFields.foreach { field =>
- if (!isComplexDimDictionaryExclude(field.dataType.get)) {
+ if (!isComplexDimDictionaryExclude(field.dataType.get) &&
+ !varcharCols.contains(field.column)) {
sortKeyDims :+= field.column
}
}
@@ -837,7 +863,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
} else {
tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, sortKeyDims.mkString(","))
}
- (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims)
+ (dimFields.toSeq, msrFields, noDictionaryDims, sortKeyDims, varcharCols)
}
def isDefaultMeasure(dataType: Option[String]): Boolean = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index d48db21..c77d0df 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -54,6 +54,7 @@ case class TableModel(
dimCols: Seq[Field],
msrCols: Seq[Field],
sortKeyDims: Option[Seq[String]],
+ varcharCols: Option[Seq[String]],
highcardinalitydims: Option[Seq[String]],
noInvertedIdxCols: Option[Seq[String]],
columnGroups: Seq[String],
@@ -212,9 +213,9 @@ class AlterTableColumnSchemaGenerator(
tableIdentifier: AbsoluteTableIdentifier,
sc: SparkContext) {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
- def isSortColumn(columnName: String): Boolean = {
+ private def isSortColumn(columnName: String): Boolean = {
val sortColumns = alterTableModel.tableProperties.get("sort_columns")
if(sortColumns.isDefined) {
sortColumns.get.contains(columnName)
@@ -222,6 +223,16 @@ class AlterTableColumnSchemaGenerator(
true
}
}
+
+ private def isVarcharColumn(columnName: String): Boolean = {
+ val varcharColumns = alterTableModel.tableProperties.get("long_string_columns")
+ if (varcharColumns.isDefined) {
+ varcharColumns.get.contains(columnName)
+ } else {
+ false
+ }
+ }
+
def process: Seq[ColumnSchema] = {
val tableSchema = tableInfo.getFactTable
val tableCols = tableSchema.getListOfColumns.asScala
@@ -241,7 +252,8 @@ class AlterTableColumnSchemaGenerator(
field.schemaOrdinal + existingColsSize,
alterTableModel.highCardinalityDims,
alterTableModel.databaseName.getOrElse(dbName),
- isSortColumn(field.name.getOrElse(field.column)))
+ isSortColumn(field.name.getOrElse(field.column)),
+ isVarcharColumn(field.name.getOrElse(field.column)))
allColumns ++= Seq(columnSchema)
newCols ++= Seq(columnSchema)
})
@@ -351,14 +363,19 @@ object TableNewProcessor {
schemaOrdinal: Int,
highCardinalityDims: Seq[String],
databaseName: String,
- isSortColumn: Boolean = false): ColumnSchema = {
+ isSortColumn: Boolean = false,
+ isVarcharColumn: Boolean = false): ColumnSchema = {
val dataType = DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse(""))
if (DataTypes.isDecimal(dataType)) {
dataType.asInstanceOf[DecimalType].setPrecision(field.precision)
dataType.asInstanceOf[DecimalType].setScale(field.scale)
}
val columnSchema = new ColumnSchema()
- columnSchema.setDataType(dataType)
+ if (isVarcharColumn) {
+ columnSchema.setDataType(DataTypes.VARCHAR)
+ } else {
+ columnSchema.setDataType(dataType)
+ }
val colName = field.name.getOrElse(field.column)
columnSchema.setColumnName(colName)
if (highCardinalityDims.contains(colName)) {
@@ -415,6 +432,11 @@ class TableNewProcessor(cm: TableModel) {
allColumns
}
+ // varchar column is a string column that in long_string_columns
+ private def isVarcharColumn(colName : String): Boolean = {
+ cm.varcharCols.get.contains(colName)
+ }
+
def getColumnSchema(
dataType: DataType,
colName: String,
@@ -450,6 +472,9 @@ class TableNewProcessor(cm: TableModel) {
columnSchema.setScale(field.scale)
columnSchema.setSchemaOrdinal(field.schemaOrdinal)
columnSchema.setSortColumn(false)
+ if (isVarcharColumn(colName)) {
+ columnSchema.setDataType(DataTypes.VARCHAR)
+ }
if(isParentColumnRelation) {
val dataMapField = map.get.get(field).get
columnSchema.setFunction(dataMapField.aggregateFunction)
@@ -517,7 +542,7 @@ class TableNewProcessor(cm: TableModel) {
val dictionaryIncludeCols = cm.tableProperties
.getOrElse(CarbonCommonConstants.DICTIONARY_INCLUDE, "")
- cm.dimCols.foreach { field =>
+ def addDimensionCol(field: Field): Unit = {
val sortField = cm.sortKeyDims.get.find(field.column equals _)
if (sortField.isEmpty) {
val encoders = if (getEncoderFromParent(field)) {
@@ -549,6 +574,12 @@ class TableNewProcessor(cm: TableModel) {
}
}
}
+ // dimensions that are not varchar
+ cm.dimCols.filter(field => !cm.varcharCols.get.contains(field.column))
+ .foreach(addDimensionCol(_))
+ // dimensions that are varchar
+ cm.dimCols.filter(field => cm.varcharCols.get.contains(field.column))
+ .foreach(addDimensionCol(_))
// check whether the column is a local dictionary column and set in column schema
if (null != cm.tableProperties) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index 5739d3e..2f2048d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -201,6 +201,7 @@ case class CarbonRelation(
object CarbonMetastoreTypes extends RegexParsers {
protected lazy val primitiveType: Parser[DataType] =
"string" ^^^ StringType |
+ "varchar" ^^^ StringType |
"float" ^^^ FloatType |
"int" ^^^ IntegerType |
"tinyint" ^^^ ShortType |
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
index 9cf7fe4..3018e49 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/NonDictionaryFieldConverterImpl.java
@@ -73,8 +73,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
.getBytesBasedOnDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
if (dataType == DataTypes.STRING
&& value.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
- throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
- + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ throw new CarbonDataLoadingException(String.format(
+ "Dataload failed, String size cannot exceed %d bytes,"
+ + " please consider long string data type",
+ CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
}
row.update(value, index);
} else {
@@ -82,8 +84,10 @@ public class NonDictionaryFieldConverterImpl implements FieldConverter {
.getDataDataTypeForNoDictionaryColumn(dimensionValue, dataType, dateFormat);
if (dataType == DataTypes.STRING
&& value.toString().length() > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
- throw new CarbonDataLoadingException("Dataload failed, String size cannot exceed "
- + CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " bytes");
+ throw new CarbonDataLoadingException(String.format(
+ "Dataload failed, String size cannot exceed %d bytes,"
+ + " please consider long string data type",
+ CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT));
}
row.update(value, index);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
index 2e3479c..86c71a6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/csvinput/CSVInputFormat.java
@@ -205,7 +205,9 @@ public class CSVInputFormat extends FileInputFormat<NullWritable, StringArrayWri
parserSettings.setSkipEmptyLines(
Boolean.valueOf(job.get(SKIP_EMPTY_LINE,
CarbonCommonConstants.CARBON_SKIP_EMPTY_LINE_DEFAULT)));
- parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+ // todo: will verify whether there is a performance degrade using -1 here
+ // parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT);
+ parserSettings.setMaxCharsPerColumn(CarbonCommonConstants.MAX_CHARS_PER_COLUMN_INFINITY);
String maxColumns = job.get(MAX_COLUMNS, "" + DEFAULT_MAX_NUMBER_OF_COLUMNS_FOR_PARSING);
parserSettings.setMaxColumns(Integer.parseInt(maxColumns));
parserSettings.getFormat().setQuote(job.get(QUOTE, QUOTE_DEFAULT).charAt(0));
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
index 8d351cf..8bec099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
@@ -54,12 +54,13 @@ public class IntermediateSortTempRow {
/**
* deserialize from bytes array to get the no sort fields
* @param outDictNoSort stores the dict & no-sort fields
- * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
+ * @param outNoDictNoSortAndVarcharDims stores the no-dict & no-sort fields,
+ * including complex and varchar fields
* @param outMeasures stores the measure fields
* @param dataTypes data type for the measure
*/
- public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
- Object[] outMeasures, DataType[] dataTypes) {
+ public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSortAndVarcharDims,
+ Object[] outMeasures, DataType[] dataTypes, int varcharDimCnt) {
ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
// read dict_no_sort
int dictNoSortCnt = outDictNoSort.length;
@@ -68,12 +69,20 @@ public class IntermediateSortTempRow {
}
// read no_dict_no_sort (including complex)
- int noDictNoSortCnt = outNoDictNoSort.length;
+ int noDictNoSortCnt = outNoDictNoSortAndVarcharDims.length - varcharDimCnt;
for (int i = 0; i < noDictNoSortCnt; i++) {
short len = rowBuffer.getShort();
byte[] bytes = new byte[len];
rowBuffer.get(bytes);
- outNoDictNoSort[i] = bytes;
+ outNoDictNoSortAndVarcharDims[i] = bytes;
+ }
+
+ // read varchar dims
+ for (int i = 0; i < varcharDimCnt; i++) {
+ int len = rowBuffer.getInt();
+ byte[] bytes = new byte[len];
+ rowBuffer.get(bytes);
+ outNoDictNoSortAndVarcharDims[i + noDictNoSortCnt] = bytes;
}
// read measure
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
index f31a2b9..bcf8a39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
@@ -46,6 +46,7 @@ public class SortStepRowHandler implements Serializable {
private int dictNoSortDimCnt = 0;
private int noDictSortDimCnt = 0;
private int noDictNoSortDimCnt = 0;
+ private int varcharDimCnt = 0;
private int measureCnt;
// indices for dict & sort dimension columns
@@ -56,6 +57,7 @@ public class SortStepRowHandler implements Serializable {
private int[] noDictSortDimIdx;
// indices for no-dict & no-sort dimension columns, including complex columns
private int[] noDictNoSortDimIdx;
+ private int[] varcharDimIdx;
// indices for measure columns
private int[] measureIdx;
@@ -70,11 +72,13 @@ public class SortStepRowHandler implements Serializable {
this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
+ this.varcharDimCnt = tableFieldStat.getVarcharDimCnt();
this.measureCnt = tableFieldStat.getMeasureCnt();
this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
+ this.varcharDimIdx = tableFieldStat.getVarcharDimIdx();
this.measureIdx = tableFieldStat.getMeasureIdx();
this.dataTypes = tableFieldStat.getMeasureDataType();
}
@@ -122,6 +126,10 @@ public class SortStepRowHandler implements Serializable {
for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
}
+ // convert varchar dims
+ for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+ nonDictArray[idxAcc++] = (byte[]) row[this.varcharDimIdx[idx]];
+ }
// convert measure data
for (int idx = 0; idx < this.measureCnt; idx++) {
@@ -146,13 +154,15 @@ public class SortStepRowHandler implements Serializable {
int[] dictDims
= new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
byte[][] noDictArray
- = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
+ = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt + this.varcharDimCnt][];
int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
- byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
+ byte[][] noDictNoSortAndVarcharDims
+ = new byte[this.noDictNoSortDimCnt + this.varcharDimCnt][];
Object[] measures = new Object[this.measureCnt];
- sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
+ sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortAndVarcharDims, measures,
+ this.dataTypes, this.varcharDimCnt);
// dict dims
System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
@@ -163,8 +173,8 @@ public class SortStepRowHandler implements Serializable {
// no dict dims, including complex
System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
noDictArray, 0, this.noDictSortDimCnt);
- System.arraycopy(noDictNoSortDims, 0, noDictArray,
- this.noDictSortDimCnt, this.noDictNoSortDimCnt);
+ System.arraycopy(noDictNoSortAndVarcharDims, 0, noDictArray,
+ this.noDictSortDimCnt, this.noDictNoSortDimCnt + this.varcharDimCnt);
// measures are already here
@@ -428,6 +438,12 @@ public class SortStepRowHandler implements Serializable {
rowBuffer.putShort((short) bytes.length);
rowBuffer.put(bytes);
}
+ // convert varchar dims
+ for (int idx = 0; idx < this.varcharDimCnt; idx++) {
+ byte[] bytes = (byte[]) row[this.varcharDimIdx[idx]];
+ rowBuffer.putInt(bytes.length);
+ rowBuffer.put(bytes);
+ }
// convert measure
Object tmpValue;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index dde18a9..9dab181 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -92,6 +92,10 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
*/
private boolean[] noDictionaryColMapping;
/**
+ * boolean mapping for long string dimension
+ */
+ private boolean[] isVarcharDimMapping;
+ /**
* agg type defined for measures
*/
private DataType[] dataTypes;
@@ -353,13 +357,18 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
measureCount = carbonTable.getMeasureByTableName(tableName).size();
List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
noDictionaryColMapping = new boolean[dimensions.size()];
+ isVarcharDimMapping = new boolean[dimensions.size()];
int i = 0;
+ int j = 0;
for (CarbonDimension dimension : dimensions) {
if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
i++;
continue;
}
noDictionaryColMapping[i++] = true;
+ if (dimension.getColumnSchema().getDataType() == DataTypes.VARCHAR) {
+ isVarcharDimMapping[j++] = true;
+ }
noDictionaryCount++;
}
dimensionColumnCount = dimensions.size();
@@ -387,7 +396,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
.createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName,
dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount,
noDictionaryCount, segmentId,
- carbonLoadModel.getTaskNo(), noDictionaryColMapping, true);
+ carbonLoadModel.getTaskNo(), noDictionaryColMapping, isVarcharDimMapping, true);
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index 705350c..502fa05 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -111,7 +111,11 @@ public class SortParameters implements Serializable {
private boolean[] noDictionaryDimnesionColumn;
private boolean[] noDictionarySortColumn;
-
+ /**
+ * whether dimension is varchar data type.
+ * since all dimensions are string, we use an array of boolean instead of datatypes
+ */
+ private boolean[] isVarcharDimensionColumn;
private int numberOfSortColumns;
private int numberOfNoDictSortColumns;
@@ -143,6 +147,7 @@ public class SortParameters implements Serializable {
parameters.segmentId = segmentId;
parameters.taskNo = taskNo;
parameters.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
+ parameters.isVarcharDimensionColumn = isVarcharDimensionColumn;
parameters.noDictionarySortColumn = noDictionarySortColumn;
parameters.numberOfSortColumns = numberOfSortColumns;
parameters.numberOfNoDictSortColumns = numberOfNoDictSortColumns;
@@ -312,6 +317,14 @@ public class SortParameters implements Serializable {
this.noDictionaryDimnesionColumn = noDictionaryDimnesionColumn;
}
+ public boolean[] getIsVarcharDimensionColumn() {
+ return isVarcharDimensionColumn;
+ }
+
+ public void setIsVarcharDimensionColumn(boolean[] isVarcharDimensionColumn) {
+ this.isVarcharDimensionColumn = isVarcharDimensionColumn;
+ }
+
public int getNumberOfCores() {
return numberOfCores;
}
@@ -371,6 +384,8 @@ public class SortParameters implements Serializable {
.getComplexNonDictionaryColumnCount());
parameters.setNoDictionaryDimnesionColumn(
CarbonDataProcessorUtil.getNoDictionaryMapping(configuration.getDataFields()));
+ parameters.setIsVarcharDimensionColumn(
+ CarbonDataProcessorUtil.getIsVarcharColumnMapping(configuration.getDataFields()));
parameters.setBatchSortSizeinMb(CarbonDataProcessorUtil.getBatchSortSizeinMb(configuration));
parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
@@ -461,7 +476,8 @@ public class SortParameters implements Serializable {
public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName,
String tableName, int dimColCount, int complexDimColCount, int measureColCount,
int noDictionaryCount, String segmentId, String taskNo,
- boolean[] noDictionaryColMaping, boolean isCompactionFlow) {
+ boolean[] noDictionaryColMaping, boolean[] isVarcharDimensionColumn,
+ boolean isCompactionFlow) {
SortParameters parameters = new SortParameters();
CarbonProperties carbonProperties = CarbonProperties.getInstance();
parameters.setDatabaseName(databaseName);
@@ -476,6 +492,7 @@ public class SortParameters implements Serializable {
parameters.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
parameters.setComplexDimColCount(complexDimColCount);
parameters.setNoDictionaryDimnesionColumn(noDictionaryColMaping);
+ parameters.setIsVarcharDimensionColumn(isVarcharDimensionColumn);
parameters.setObserver(new SortObserver());
// get sort buffer size
parameters.setSortBufferSize(Integer.parseInt(carbonProperties
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
index 0d1303a..094bd83 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/TableFieldStat.java
@@ -33,9 +33,13 @@ public class TableFieldStat implements Serializable {
private int dictSortDimCnt = 0;
private int dictNoSortDimCnt = 0;
private int noDictSortDimCnt = 0;
+ // for columns that are no_dict_dim and no_sort_dim and complex, except the varchar dims
private int noDictNoSortDimCnt = 0;
+ // for columns that are varchar data type
+ private int varcharDimCnt = 0;
// whether sort column is of dictionary type or not
private boolean[] isSortColNoDictFlags;
+ private boolean[] isVarcharDimFlags;
private int measureCnt;
private DataType[] measureDataType;
@@ -47,6 +51,8 @@ public class TableFieldStat implements Serializable {
private int[] noDictSortDimIdx;
// indices for no-dict & no-sort dimension columns, including complex columns
private int[] noDictNoSortDimIdx;
+ // indices for varchar dimension columns
+ private int[] varcharDimIdx;
// indices for measure columns
private int[] measureIdx;
@@ -55,6 +61,7 @@ public class TableFieldStat implements Serializable {
int complexDimCnt = sortParameters.getComplexDimColCount();
int dictDimCnt = sortParameters.getDimColCount() - noDictDimCnt;
this.isSortColNoDictFlags = sortParameters.getNoDictionarySortColumn();
+ this.isVarcharDimFlags = sortParameters.getIsVarcharDimensionColumn();
int sortColCnt = isSortColNoDictFlags.length;
for (boolean flag : isSortColNoDictFlags) {
if (flag) {
@@ -66,22 +73,33 @@ public class TableFieldStat implements Serializable {
this.measureCnt = sortParameters.getMeasureColCount();
this.measureDataType = sortParameters.getMeasureDataType();
+ for (boolean flag : isVarcharDimFlags) {
+ if (flag) {
+ varcharDimCnt++;
+ }
+ }
+
// be careful that the default value is 0
this.dictSortDimIdx = new int[dictSortDimCnt];
this.dictNoSortDimIdx = new int[dictDimCnt - dictSortDimCnt];
this.noDictSortDimIdx = new int[noDictSortDimCnt];
- this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt];
+ this.noDictNoSortDimIdx = new int[noDictDimCnt + complexDimCnt - noDictSortDimCnt
+ - varcharDimCnt];
+ this.varcharDimIdx = new int[varcharDimCnt];
this.measureIdx = new int[measureCnt];
int tmpNoDictSortCnt = 0;
int tmpNoDictNoSortCnt = 0;
int tmpDictSortCnt = 0;
int tmpDictNoSortCnt = 0;
+ int tmpVarcharCnt = 0;
boolean[] isDimNoDictFlags = sortParameters.getNoDictionaryDimnesionColumn();
for (int i = 0; i < isDimNoDictFlags.length; i++) {
if (isDimNoDictFlags[i]) {
- if (i < sortColCnt && isSortColNoDictFlags[i]) {
+ if (isVarcharDimFlags[i]) {
+ varcharDimIdx[tmpVarcharCnt++] = i;
+ } else if (i < sortColCnt && isSortColNoDictFlags[i]) {
noDictSortDimIdx[tmpNoDictSortCnt++] = i;
} else {
noDictNoSortDimIdx[tmpNoDictNoSortCnt++] = i;
@@ -126,10 +144,18 @@ public class TableFieldStat implements Serializable {
return noDictNoSortDimCnt;
}
+ public int getVarcharDimCnt() {
+ return varcharDimCnt;
+ }
+
public boolean[] getIsSortColNoDictFlags() {
return isSortColNoDictFlags;
}
+ public boolean[] getIsVarcharDimFlags() {
+ return isVarcharDimFlags;
+ }
+
public int getMeasureCnt() {
return measureCnt;
}
@@ -154,6 +180,10 @@ public class TableFieldStat implements Serializable {
return noDictNoSortDimIdx;
}
+ public int[] getVarcharDimIdx() {
+ return varcharDimIdx;
+ }
+
public int[] getMeasureIdx() {
return measureIdx;
}
@@ -166,11 +196,12 @@ public class TableFieldStat implements Serializable {
&& dictNoSortDimCnt == that.dictNoSortDimCnt
&& noDictSortDimCnt == that.noDictSortDimCnt
&& noDictNoSortDimCnt == that.noDictNoSortDimCnt
+ && varcharDimCnt == that.varcharDimCnt
&& measureCnt == that.measureCnt;
}
@Override public int hashCode() {
return Objects.hash(dictSortDimCnt, dictNoSortDimCnt, noDictSortDimCnt,
- noDictNoSortDimCnt, measureCnt);
+ noDictNoSortDimCnt, varcharDimCnt, measureCnt);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
index 5408193..1a1c5d1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/TablePage.java
@@ -39,7 +39,8 @@ import org.apache.carbondata.core.datastore.page.encoding.EncodedColumnPage;
import org.apache.carbondata.core.datastore.page.encoding.EncodingFactory;
import org.apache.carbondata.core.datastore.page.key.TablePageKey;
import org.apache.carbondata.core.datastore.page.statistics.KeyPageStatsCollector;
-import org.apache.carbondata.core.datastore.page.statistics.LVStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVLongStringStatsCollector;
+import org.apache.carbondata.core.datastore.page.statistics.LVShortStringStatsCollector;
import org.apache.carbondata.core.datastore.page.statistics.PrimitivePageStatsCollector;
import org.apache.carbondata.core.datastore.row.CarbonRow;
import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
@@ -98,8 +99,16 @@ public class TablePage {
noDictDimensionPages = new ColumnPage[model.getNoDictionaryCount()];
for (int i = 0; i < noDictDimensionPages.length; i++) {
TableSpec.DimensionSpec spec = tableSpec.getDimensionSpec(i + numDictDimension);
- ColumnPage page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
- page.setStatsCollector(LVStringStatsCollector.newInstance());
+ ColumnPage page;
+ if (DataTypes.VARCHAR == spec.getSchemaDataType()) {
+ page = ColumnPage.newPage(spec, DataTypes.VARCHAR, pageSize);
+ page.setStatsCollector(LVLongStringStatsCollector.newInstance());
+ } else {
+ // In previous implementation, other data types such as string, date and timestamp
+ // will be encoded using string page
+ page = ColumnPage.newPage(spec, DataTypes.STRING, pageSize);
+ page.setStatsCollector(LVShortStringStatsCollector.newInstance());
+ }
noDictDimensionPages[i] = page;
}
complexDimensionPages = new ComplexColumnPage[model.getComplexColumnCount()];
@@ -155,16 +164,21 @@ public class TablePage {
dictDimensionPages[i].putData(rowId, keys[i]);
}
- // 2. convert noDictionary columns and complex columns.
+ // 2. convert noDictionary columns and complex columns and varchar columns.
int noDictionaryCount = noDictDimensionPages.length;
int complexColumnCount = complexDimensionPages.length;
if (noDictionaryCount > 0 || complexColumnCount > 0) {
+ TableSpec tableSpec = model.getTableSpec();
byte[][] noDictAndComplex = WriteStepRowUtil.getNoDictAndComplexDimension(row);
for (int i = 0; i < noDictAndComplex.length; i++) {
- if (i < noDictionaryCount) {
+ if (tableSpec.getDimensionSpec(dictDimensionPages.length + i).getSchemaDataType()
+ == DataTypes.VARCHAR) {
+ byte[] valueWithLength = addIntLengthToByteArray(noDictAndComplex[i]);
+ noDictDimensionPages[i].putData(rowId, valueWithLength);
+ } else if (i < noDictionaryCount) {
// noDictionary columns, since it is variable length, we need to prepare each
// element as LV result byte array (first two bytes are the length of the array)
- byte[] valueWithLength = addLengthToByteArray(noDictAndComplex[i]);
+ byte[] valueWithLength = addShortLengthToByteArray(noDictAndComplex[i]);
noDictDimensionPages[i].putData(rowId, valueWithLength);
} else {
// complex columns
@@ -250,7 +264,7 @@ public class TablePage {
}
// Adds length as a short element (first 2 bytes) to the head of the input byte array
- private byte[] addLengthToByteArray(byte[] input) {
+ private byte[] addShortLengthToByteArray(byte[] input) {
if (input.length > Short.MAX_VALUE) {
throw new RuntimeException("input data length " + input.length +
" bytes too long, maximum length supported is " + Short.MAX_VALUE + " bytes");
@@ -262,6 +276,15 @@ public class TablePage {
return output;
}
+ // Adds length as a integer element (first 4 bytes) to the head of the input byte array
+ private byte[] addIntLengthToByteArray(byte[] input) {
+ byte[] output = new byte[input.length + 4];
+ ByteBuffer buffer = ByteBuffer.wrap(output);
+ buffer.putInt(input.length);
+ buffer.put(input, 0, input.length);
+ return output;
+ }
+
void encode() throws KeyGenException, MemoryException, IOException {
// encode dimensions and measure
EncodedColumnPage[] dimensions = encodeAndCompressDimensions();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dc53dee2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index f921fd5..12c95a9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -223,6 +223,26 @@ public final class CarbonDataProcessorUtil {
.toPrimitive(noDictionaryMapping.toArray(new Boolean[noDictionaryMapping.size()]));
}
+ /**
+ * Preparing the boolean [] to map whether the dimension is varchar data type or not.
+ */
+ public static boolean[] getIsVarcharColumnMapping(DataField[] fields) {
+ List<Boolean> isVarcharColumnMapping = new ArrayList<Boolean>();
+ for (DataField field : fields) {
+ // for complex type need to break the loop
+ if (field.getColumn().isComplex()) {
+ break;
+ }
+
+ if (field.getColumn().isDimension()) {
+ isVarcharColumnMapping.add(
+ field.getColumn().getColumnSchema().getDataType() == DataTypes.VARCHAR);
+ }
+ }
+ return ArrayUtils.toPrimitive(
+ isVarcharColumnMapping.toArray(new Boolean[isVarcharColumnMapping.size()]));
+ }
+
public static boolean[] getNoDictionaryMapping(CarbonColumn[] carbonColumns) {
List<Boolean> noDictionaryMapping = new ArrayList<Boolean>();
for (CarbonColumn column : carbonColumns) {