You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/06/21 07:01:51 UTC

carbondata git commit: [CARBONDATA-2513][32K] Support write long string from dataframe

Repository: carbondata
Updated Branches:
  refs/heads/master 218a8deb6 -> 55f4bc6c8


[CARBONDATA-2513][32K] Support write long string from dataframe

support write long string from dataframe

This closes #2382


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55f4bc6c
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55f4bc6c
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55f4bc6c

Branch: refs/heads/master
Commit: 55f4bc6c89f637b162b414033512901e9bd8a745
Parents: 218a8de
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Wed Jun 20 19:01:24 2018 +0800
Committer: kumarvishal09 <ku...@gmail.com>
Committed: Thu Jun 21 12:31:21 2018 +0530

----------------------------------------------------------------------
 .../VarcharDataTypesBasicTestCase.scala         | 32 +++++++++++++++++++-
 .../apache/carbondata/spark/CarbonOption.scala  |  2 ++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 15 +++++++--
 .../carbondata/spark/util/CarbonScalaUtil.scala |  3 +-
 .../spark/sql/CarbonDataFrameWriter.scala       |  1 +
 .../streaming/parser/FieldConverter.scala       | 11 +++++--
 6 files changed, 57 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
index 9ea3f1f..9798178 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/longstring/VarcharDataTypesBasicTestCase.scala
@@ -20,8 +20,9 @@ package org.apache.carbondata.spark.testsuite.longstring
 import java.io.{File, PrintWriter}
 
 import org.apache.commons.lang3.RandomStringUtils
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{DataFrame, Row, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -36,6 +37,7 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
   private val inputFile_2g_column_page = s"$inputDir$fileName_2g_column_page"
   private val lineNum = 1000
   private var content: Content = _
+  private var longStringDF: DataFrame = _
   private var originMemorySize = CarbonProperties.getInstance().getProperty(
     CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB,
     CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB_DEFAULT)
@@ -257,6 +259,34 @@ class VarcharDataTypesBasicTestCase extends QueryTest with BeforeAndAfterEach wi
     // since after exception wrapper, we cannot get the root cause directly
   }
 
+  private def prepareDF(): Unit = {
+    val schema = StructType(
+      StructField("id", IntegerType, nullable = true) ::
+      StructField("name", StringType, nullable = true) ::
+      StructField("description", StringType, nullable = true) ::
+      StructField("address", StringType, nullable = true) ::
+      StructField("note", StringType, nullable = true) :: Nil
+    )
+    longStringDF = sqlContext.sparkSession.read
+      .schema(schema)
+      .csv(inputFile)
+  }
+
+  test("write from dataframe with long string datatype") {
+    prepareDF()
+    // write spark dataframe to carbondata with `long_string_columns` property
+    longStringDF.write
+      .format("carbondata")
+      .option("tableName", longStringTable)
+      .option("single_pass", "false")
+      .option("sort_columns", "name")
+      .option("long_string_columns", "description, note")
+      .mode(SaveMode.Overwrite)
+      .save()
+
+    checkQuery()
+  }
+
   // will create 2 long string columns
   private def createFile(filePath: String, line: Int = 10000, start: Int = 0,
       varcharLen: Int = Short.MaxValue + 1000): Content = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index a48e63d..5f23f77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -48,6 +48,8 @@ class CarbonOption(options: Map[String, String]) {
 
   def dictionaryExclude: Option[String] = options.get("dictionary_exclude")
 
+  def longStringColumns: Option[String] = options.get("long_string_columns")
+
   def tableBlockSize: Option[String] = options.get("table_blocksize")
 
   def bucketNumber: Int = options.getOrElse("bucketnumber", "0").toInt

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 4bfdd3b..5ed39fa 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -41,6 +41,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.common.logging.impl.StandardLogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -447,6 +448,10 @@ class NewRddIterator(rddIter: Iterator[Row],
   private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+  import scala.collection.JavaConverters._
+  private val isVarcharTypeMapping =
+    carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn(
+      carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR)
   def hasNext: Boolean = rddIter.hasNext
 
   def next: Array[AnyRef] = {
@@ -454,7 +459,8 @@ class NewRddIterator(rddIter: Iterator[Row],
     val columns = new Array[AnyRef](row.length)
     for (i <- 0 until columns.length) {
       columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
-        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+        isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
     }
     columns
   }
@@ -491,6 +497,10 @@ class LazyRddIterator(serializer: SerializerInstance,
   private val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
   private val serializationNullFormat =
     carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+  import scala.collection.JavaConverters._
+  private val isVarcharTypeMapping =
+    carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCreateOrderColumn(
+      carbonLoadModel.getTableName).asScala.map(_.getDataType == DataTypes.VARCHAR)
 
   private var rddIter: Iterator[Row] = null
   private var uninitialized = true
@@ -514,7 +524,8 @@ class LazyRddIterator(serializer: SerializerInstance,
     val columns = new Array[AnyRef](row.length)
     for (i <- 0 until columns.length) {
       columns(i) = CarbonScalaUtil.getString(row.get(i), serializationNullFormat,
-        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat)
+        delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat,
+        isVarcharType = i < isVarcharTypeMapping.size && isVarcharTypeMapping(i))
     }
     columns
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 3e94a66..44d3cca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -127,9 +127,10 @@ object CarbonScalaUtil {
       delimiterLevel2: String,
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
+      isVarcharType: Boolean = false,
       level: Int = 1): String = {
     FieldConverter.objectToString(value, serializationNullFormat, delimiterLevel1,
-      delimiterLevel2, timeStampFormat, dateFormat, level)
+      delimiterLevel2, timeStampFormat, dateFormat, isVarcharType = isVarcharType, level)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 67817c0..c81622e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -86,6 +86,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       "SORT_COLUMNS" -> options.sortColumns,
       "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
       "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
+      "LONG_STRING_COLUMNS" -> options.longStringColumns,
       "TABLE_BLOCKSIZE" -> options.tableBlockSize,
       "STREAMING" -> Option(options.isStreaming.toString)
     ).filter(_._2.isDefined)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55f4bc6c/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
index 8661417..e167d46 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/parser/FieldConverter.scala
@@ -32,6 +32,7 @@ object FieldConverter {
    * @param delimiterLevel2 level 2 delimiter for complex type
    * @param timeStampFormat timestamp format
    * @param dateFormat date format
+   * @param isVarcharType whether it is varchar type. A varchar type has no string length limit
    * @param level level for recursive call
    */
   def objectToString(
@@ -41,12 +42,14 @@ object FieldConverter {
       delimiterLevel2: String,
       timeStampFormat: SimpleDateFormat,
       dateFormat: SimpleDateFormat,
+      isVarcharType: Boolean = false,
       level: Int = 1): String = {
     if (value == null) {
       serializationNullFormat
     } else {
       value match {
-        case s: String => if (s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
+        case s: String => if (!isVarcharType &&
+                              s.length > CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT) {
           throw new Exception("Dataload failed, String length cannot exceed " +
                               CarbonCommonConstants.MAX_CHARS_PER_COLUMN_DEFAULT + " characters")
         } else {
@@ -71,7 +74,8 @@ object FieldConverter {
           val builder = new StringBuilder()
           s.foreach { x =>
             builder.append(objectToString(x, serializationNullFormat, delimiterLevel1,
-              delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+              delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+              .append(delimiter)
           }
           builder.substring(0, builder.length - delimiter.length())
         case m: scala.collection.Map[Any, Any] =>
@@ -85,7 +89,8 @@ object FieldConverter {
           val builder = new StringBuilder()
           for (i <- 0 until r.length) {
             builder.append(objectToString(r(i), serializationNullFormat, delimiterLevel1,
-              delimiterLevel2, timeStampFormat, dateFormat, level + 1)).append(delimiter)
+              delimiterLevel2, timeStampFormat, dateFormat, isVarcharType, level + 1))
+              .append(delimiter)
           }
           builder.substring(0, builder.length - delimiter.length())
         case other => other.toString