You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/03 07:29:26 UTC
carbondata git commit: [CARBONDATA-2110]deprecate 'tempCSV' option of
dataframe load
Repository: carbondata
Updated Branches:
refs/heads/master 6c097cbf3 -> da129d527
[CARBONDATA-2110]deprecate 'tempCSV' option of dataframe load
deprecate 'tempCSV' option of dataframe load, it won't generate temp file on hdfs, no matter the value of tempCSV
This closes #1916
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/da129d52
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/da129d52
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/da129d52
Branch: refs/heads/master
Commit: da129d5277babe498fa5686fe53d01433d112bab
Parents: 6c097cb
Author: qiuchenjian <80...@qq.com>
Authored: Sat Feb 3 00:14:07 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Feb 3 15:29:08 2018 +0800
----------------------------------------------------------------------
.../testsuite/dataload/TestLoadDataFrame.scala | 19 ++++
.../spark/sql/CarbonDataFrameWriter.scala | 98 +-------------------
2 files changed, 20 insertions(+), 97 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 6f03493..693c145 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -29,6 +29,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
var df: DataFrame = _
var dataFrame: DataFrame = _
var df2: DataFrame = _
+ var df3: DataFrame = _
var booldf:DataFrame = _
@@ -52,6 +53,10 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
.map(x => ("key_" + x, "str_" + x, x, x * 2, x * 3))
.toDF("c1", "c2", "c3", "c4", "c5")
+ df3 = sqlContext.sparkContext.parallelize(1 to 3)
+ .map(x => (x.toString + "te,s\nt", x))
+ .toDF("c1", "c2")
+
val boolrdd = sqlContext.sparkContext.parallelize(
Row("anubhav",true) ::
Row("prince",false) :: Nil)
@@ -74,6 +79,7 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS carbon9")
sql("DROP TABLE IF EXISTS carbon10")
sql("DROP TABLE IF EXISTS carbon11")
+ sql("DROP TABLE IF EXISTS carbon12")
sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
@@ -261,6 +267,19 @@ test("test the boolean data type"){
val isStreaming: String = descResult.collect().find(row=>row(0).asInstanceOf[String].trim.equalsIgnoreCase("streaming")).get.get(1).asInstanceOf[String]
assert(isStreaming.contains("true"))
}
+
+ test("test datasource table with specified char") {
+
+ df3.write
+ .format("carbondata")
+ .option("tableName", "carbon12")
+ .option("tempCSV", "true")
+ .mode(SaveMode.Overwrite)
+ .save()
+ checkAnswer(
+ sql("select count(*) from carbon12"), Row(3)
+ )
+ }
private def getSortColumnValue(tableName: String): Array[String] = {
val desc = sql(s"desc formatted $tableName")
val sortColumnRow = desc.collect.find(r =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/da129d52/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 2b06375..2be89b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -17,16 +17,12 @@
package org.apache.spark.sql
-import org.apache.hadoop.fs.Path
-import org.apache.hadoop.io.compress.GzipCodec
import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.types._
import org.apache.spark.sql.util.CarbonException
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonType}
-import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.spark.CarbonOption
class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
@@ -46,90 +42,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
private def writeToCarbonFile(parameters: Map[String, String] = Map()): Unit = {
val options = new CarbonOption(parameters)
- if (options.tempCSV) {
- loadTempCSV(options)
- } else {
- loadDataFrame(options)
- }
+ loadDataFrame(options)
}
-
- /**
- * Firstly, saving DataFrame to CSV files
- * Secondly, load CSV files
- * @param options
- */
- private def loadTempCSV(options: CarbonOption): Unit = {
- // temporary solution: write to csv file, then load the csv into carbon
- val storePath = CarbonProperties.getStorePath
- val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
- .append("tempCSV")
- .append(CarbonCommonConstants.UNDERSCORE)
- .append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession))
- .append(CarbonCommonConstants.UNDERSCORE)
- .append(options.tableName)
- .append(CarbonCommonConstants.UNDERSCORE)
- .append(System.nanoTime())
- .toString
- writeToTempCSVFile(tempCSVFolder, options)
-
- val tempCSVPath = new Path(tempCSVFolder)
- val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
-
- def countSize(): Double = {
- var size: Double = 0
- val itor = fs.listFiles(tempCSVPath, true)
- while (itor.hasNext) {
- val f = itor.next()
- if (f.getPath.getName.startsWith("part-")) {
- size += f.getLen
- }
- }
- size
- }
-
- LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB")
-
- try {
- sqlContext.sql(makeLoadString(tempCSVFolder, options))
- } finally {
- fs.delete(tempCSVPath, true)
- }
- }
-
- private def writeToTempCSVFile(tempCSVFolder: String, options: CarbonOption): Unit = {
- val strRDD = dataFrame.rdd.mapPartitions { case iter =>
- new Iterator[String] {
- override def hasNext = iter.hasNext
-
- def convertToCSVString(seq: Seq[Any]): String = {
- val build = new java.lang.StringBuilder()
- if (seq.head != null) {
- build.append(seq.head.toString)
- }
- val itemIter = seq.tail.iterator
- while (itemIter.hasNext) {
- build.append(CarbonCommonConstants.COMMA)
- val value = itemIter.next()
- if (value != null) {
- build.append(value.toString)
- }
- }
- build.toString
- }
-
- override def next: String = {
- convertToCSVString(iter.next.toSeq)
- }
- }
- }
-
- if (options.compress) {
- strRDD.saveAsTextFile(tempCSVFolder, classOf[GzipCodec])
- } else {
- strRDD.saveAsTextFile(tempCSVFolder)
- }
- }
-
/**
* Loading DataFrame directly without saving DataFrame to CSV files.
* @param options
@@ -189,14 +103,4 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
""".stripMargin
}
- private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
- val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
- s"""
- | LOAD DATA INPATH '$csvFolder'
- | INTO TABLE $dbName.${options.tableName}
- | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
- | 'SINGLE_PASS' = '${options.singlePass}')
- """.stripMargin
- }
-
}