You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/06/21 07:01:51 UTC
carbondata git commit: [CARBONDATA-2513][32K] Support write long
string from dataframe
Repository: carbondata
Updated Branches:
refs/heads/master 218a8deb6 -> 55f4bc6c8
[CARBONDATA-2513][32K] Support write long string from dataframe
support write long string from dataframe
This closes #2382
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55f4bc6c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55f4bc6c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55f4bc6c
Branch: refs/heads/master
Commit: 55f4bc6c89f637b162b414033512901e9bd8a745
Parents: 218a8de
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Jun 20 19:01:24 2018 +0800
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Jun 21 12:31:21 2018 +0530
----------------------------------------------------------------------
.../VarcharDataTypesBasicTestCase.scala | 32 +++++++++++++++++++-
.../apache/carbondata/spark/CarbonOption.scala | 2 ++
.../spark/rdd/NewCarbonDataLoadRDD.scala | 15 +++++++--
.../carbondata/spark/util/CarbonScalaUtil.scala | 3 +-
.../spark/sql/CarbonDataFrameWriter.scala | 1 +
.../streaming/parser/FieldConverter.scala | 11 +++++--
6 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 9ea3f1f..9798178 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -20,8 +20,9 @@ package org.apache.carbondata.spark.testsuite.longstring
import java.io.{File, PrintWriter}
import org.apache.commons.lang3.RandomStringUtils
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -36,6 +37,7 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page"
private val lineNum = 1000
private var content: Content = _
+ private var longStringDF: DataFrame = _
private var originMemorySize = CarbonProperties.getInstance().getProperty(
CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
@@ -257,6 +259,34 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
// since after exception wrapper, we cannot get the root cause directly
}
+ private def prepareDF(): Unit = {
+ val schema = StructType(
+ StructField("id", IntegerType, nullable = true) ::
+ StructField("name", StringType, nullable = true) ::
+ StructField("description", StringType, nullable = true) ::
+ StructField("address", StringType, nullable = true) ::
+ StructField("note", StringType, nullable = true) :: Nil
+ )
+ longStringDF = sqlContext.sparkSession.read
+ .schema(schema)
+ .csv(inputFile)
+ }
+
+ test("write from dataframe with long string datatype") {
+ prepareDF()
+ // write spark dataframe to carbondata with `long_string_columns` property
+ longStringDF.write
+ .format("carbondata")
+ .option("tableName", longStringTable)
+ .option("single_pass", "false")
+ .option("sort_columns", "name")
+ .option("long_string_columns", "description, note")
+ .mode(SaveMode.Overwrite)
+ .save()
+
+ checkQuery()
+ }
+
// will create 2 long string columns
private def createFile(filePath: String, line: Int = 10000, start: Int = 0,
varcharLen: Int = Short.MaxValue + 1000): Content = {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index a48e63d..5f23f77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -48,6 +48,8 @@ class CarbonOption(options: Map[String, String]) {
def dictionaryExclude: Option[String] = options.get("dictionary_exclude")
+ def longStringColumns: Option[String] = options.get("long_string_columns")
+
def tableBlockSize: Option[String] = options.get("table_blocksize")
def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 4bfdd3b..5ed39fa 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.common.logging.impl.StandardLogService
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.metadata.datatype.DataTypes
import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -447,6 +448,10 @@ class NewRddIterator(rddIter: Iterator[Row],
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ import scala.collection.JavaConverters._
+ private val isVarcharTypeMapping =
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn(
+ carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR)
def hasNext: Boolean = rddIter.hasNext
def next: Array[AnyRef] = {
@@ -454,7 +459,8 @@ class NewRddIterator(rddIter: Iterator[Row],
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+ isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
}
@@ -491,6 +497,10 @@ class LazyRddIterator(serializer: SerializerInstance,
private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
private val serializationNullFormat =
carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+ import scala.collection.JavaConverters._
+ private val isVarcharTypeMapping =
+ carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn(
+ carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR)
private var rddIter: Iterator[Row] = null
private var uninitialized = true
@@ -514,7 +524,8 @@ class LazyRddIterator(serializer: SerializerInstance,
val columns = new Array[AnyRef](row.length)
for (i <- 0 until columns.length) {
columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
- delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+ delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+ isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
}
columns
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3e94a66..44d3cca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -127,9 +127,10 @@ object CarbonScalaUtil {
delimiterLevel2: String,
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
+ isVarcharType: Boolean = false,
level: Int = 1): String = {
FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, level)
+ delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
}
/**
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 67817c0..c81622e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -86,6 +86,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
"SORT_COLUMNS" -> options.sortColumns,
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
+ "LONG_STRING_COLUMNS" -> options.longStringColumns,
"TABLE_BLOCKSIZE" -> options.tableBlockSize,
"STREAMING" -> Option(options.isStreaming.toString)
).filter(_._2.isDefined)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 8661417..e167d46 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -32,6 +32,7 @@ object FieldConverter {
* @param delimiterLevel2 level 2 delimiter for complex type
* @param timeStampFormat timestamp format
* @param dateFormat date format
+ * @param isVarcharType whether it is varchar type. A varchar type has no string length limit
* @param level level for recursive call
*/
def objectToString(
@@ -41,12 +42,14 @@ object FieldConverter {
delimiterLevel2: String,
timeStampFormat: SimpleDateFormat,
dateFormat: SimpleDateFormat,
+ isVarcharType: Boolean = false,
level: Int = 1): String = {
if (value == null) {
serializationNullFormat
} else {
value match {
- case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+ case s: String => if (!isVarcharType &&
+ s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
throw new Exception("Dataload failed, String length cannot exceed " +
CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters")
} else {
@@ -71,7 +74,8 @@ object FieldConverter {
val builder = new StringBuilder()
s.foreach { x =>
builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+ delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+ .append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
case m: scala.collection.Map[Any, Any] =>
@@ -85,7 +89,8 @@ object FieldConverter {
val builder = new StringBuilder()
for (i <- 0 until r.length) {
builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
- delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+ delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+ .append(delimiter)
}
builder.substring(0, builder.length - delimiter.length())
case other => other.toString