You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/11/25 15:13:18 UTC

[carbondata] branch master updated: [CARBONDATA-4057] Support Complex DataType when Save DataFrame with MODE.OVERWRITE

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 32c2306  [CARBONDATA-4057] Support Complex DataType when Save DataFrame with MODE.OVERWRITE
32c2306 is described below

commit 32c2306fd954584991d7e1b883581a1f475e6957
Author: haomarch <ma...@126.com>
AuthorDate: Tue Nov 24 17:59:56 2020 +0800

    [CARBONDATA-4057] Support Complex DataType when Save DataFrame with MODE.OVERWRITE
    
    Why is this PR needed?
    Currently, when save dataframe with MODE.OVERWRITE, createtable will be triggered. But complex type isn't supported.
    Which weaks the functionality of dataframe save in carbondata format.
    
    What changes were proposed in this PR?
    Add the converter of ARRAY/MAP/STRUCT in CarbonDataFrameWriter.convertToCarbonType
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4021
---
 .../apache/spark/sql/CarbonDataFrameWriter.scala   |  3 ++
 .../complexType/TestAllComplexDataType.scala       | 58 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 1 deletion(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index d07dbb4..a0a871a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -74,6 +74,9 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       case decimal: DecimalType => s"decimal(${decimal.precision}, ${decimal.scale})"
       case BooleanType => CarbonType.BOOLEAN.getName
       case BinaryType => CarbonType.BINARY.getName
+      case ArrayType(elementType, _) => sparkType.simpleString
+      case StructType(fields) => sparkType.simpleString
+      case MapType(keyType, valueType, _) => sparkType.simpleString
       case other => CarbonException.analysisException(s"unsupported type: $other")
     }
   }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
index 041e7ab..d2f4ea1 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/complexType/TestAllComplexDataType.scala
@@ -16,8 +16,13 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.complexType
 
-import org.apache.spark.sql.DataFrame
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable.WrappedArray
+
+import org.apache.spark.sql.{DataFrame, Row}
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, DateType, Decimal, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType, VarcharType}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -716,5 +721,56 @@ class TestAllComplexDataType extends QueryTest with BeforeAndAfterAll {
     insertData("fileformatTable")
     checkResults()
   }
+
+  test("test when dataframe save with complex datatype") {
+    dropTable("dataframe_complex_carbondata")
+    dropTable("dataframe_complex_parquet")
+
+    val structureData = Seq(
+      Row(
+        "id1", 1, Short.MinValue, 1L, 1.0f, 1.0d, BigDecimal(832.23),
+        "binary".getBytes,
+        true,
+        Timestamp.valueOf("2017-01-01 00:00:00.0"),
+        Date.valueOf("1990-01-01"),
+        WrappedArray.make(Array("1")), Map("1"-> "1"),
+        WrappedArray.make(Array(Map("1"-> "1"))),
+        Row("x")
+      )
+    )
+
+    import scala.collection.JavaConverters._
+    val schemas = Seq(
+        StructField("c1", StringType, nullable = false),
+        StructField("c2", IntegerType, nullable = false),
+        StructField("c3", ShortType, nullable = false),
+        StructField("c4", LongType, nullable = false),
+        StructField("c5", FloatType, nullable = false),
+        StructField("c6", DoubleType, nullable = false),
+        StructField("c7", DecimalType(18, 2), nullable = false),
+        StructField("c8", BinaryType, nullable = false),
+        StructField("c9", BooleanType, nullable = false),
+        StructField("c10", TimestampType, nullable = false),
+        StructField("c11", DateType, nullable = false),
+        StructField("c12", ArrayType(StringType), false),
+        StructField("c13", MapType(StringType, StringType), false),
+        StructField("c14", ArrayType(MapType(StringType, StringType)), false),
+        StructField("c15", StructType(List(StructField("c15_1", StringType, nullable = false)).asJava), false)
+      )
+
+    val df = sqlContext.sparkSession.createDataFrame(sqlContext.sparkContext.parallelize(structureData),
+      StructType(schemas))
+    df.write.option("dbName", "default")
+      .option("tableName", "dataframe_complex_carbondata")
+      .format("carbondata")
+      .mode("overwrite")
+      .save()
+
+    checkAnswer(
+      sql("select * from dataframe_complex_carbondata"),
+      structureData
+    )
+    dropTable("dataframe_complex_carbondata")
+  }
 }
 // scalastyle:on lineLength