You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/09/22 07:22:26 UTC
[1/2] incubator-carbondata git commit: delete success file
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 0be0f3083 -> 699d0a050
delete success file
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a03b4600
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a03b4600
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a03b4600
Branch: refs/heads/master
Commit: a03b460094ada6c9547542e6bfce51bd3e33ec8b
Parents: 0be0f30
Author: jackylk <ja...@huawei.com>
Authored: Wed Sep 21 11:37:02 2016 -0700
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 22 00:18:45 2016 -0700
----------------------------------------------------------------------
.../examples/DataFrameAPIExample.scala | 1 +
.../spark/implicit/DataFrameFuncs.scala | 59 +++++++++++---------
2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a03b4600/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
index d2ee959..31750ac 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/DataFrameAPIExample.scala
@@ -39,6 +39,7 @@ object DataFrameAPIExample {
df.write
.format("carbondata")
.option("tableName", "carbon1")
+ .option("compress", "true")
.mode(SaveMode.Overwrite)
.save()
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a03b4600/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
index d5185cc..44594eb 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/implicit/DataFrameFuncs.scala
@@ -18,16 +18,17 @@
package org.apache.carbondata.spark
import org.apache.hadoop.fs.Path
-import org.apache.spark.sql.{CarbonContext, DataFrame, SaveMode}
+import org.apache.spark.Logging
+import org.apache.spark.sql.{CarbonContext, DataFrame, DataFrameWriter, SaveMode}
import org.apache.spark.sql.types._
import org.apache.carbondata.core.carbon.metadata.datatype.{DataType => CarbonType}
-class DataFrameFuncs(dataFrame: DataFrame) {
+class DataFrameFuncs(dataFrame: DataFrame) extends Logging {
/**
- * Saves DataFrame as CarbonData files.
- */
+ * Saves DataFrame as CarbonData files.
+ */
def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = {
// To avoid derby problem, dataframe need to be writen and read using CarbonContext
require(dataFrame.sqlContext.isInstanceOf[CarbonContext],
@@ -36,42 +37,47 @@ class DataFrameFuncs(dataFrame: DataFrame) {
val options = new CarbonOption(parameters)
val tableName = options.tableName
- val dbName = options.dbName
// temporary solution: write to csv file, then load the csv into carbon
val tempCSVFolder = s"./tempCSV"
- dataFrame.write
- .format(csvPackage)
- .option("header", "true")
- .mode(SaveMode.Overwrite)
- .save(tempCSVFolder)
+ var writer: DataFrameWriter =
+ dataFrame.write
+ .format(csvPackage)
+ .option("header", "false")
+ .mode(SaveMode.Overwrite)
+
+ if (options.compress.equals("true")) {
+ writer = writer.option("codec", "gzip")
+ }
- val sqlContext = dataFrame.sqlContext
- val tempCSVPath = new Path(tempCSVFolder)
- val fs = tempCSVPath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
+ writer.save(tempCSVFolder)
- try {
- sqlContext.sql(makeCreateTableString(dataFrame.schema, options))
+ val cc = CarbonContext.getInstance(dataFrame.sqlContext.sparkContext)
+ val tempCSVPath = new Path(tempCSVFolder)
+ val fs = tempCSVPath.getFileSystem(dataFrame.sqlContext.sparkContext.hadoopConfiguration)
- // add 'csv' as file extension to all generated part file
+ def countSize(): Double = {
+ var size: Double = 0
val itor = fs.listFiles(tempCSVPath, true)
while (itor.hasNext) {
val f = itor.next()
if (f.getPath.getName.startsWith("part-")) {
- val newPath = s"${ f.getPath.getParent }/${ f.getPath.getName }.csv"
- if (!fs.rename(f.getPath, new Path(newPath))) {
- sqlContext.sql(s"DROP TABLE IF EXISTS $dbName.$tableName")
- throw new RuntimeException("File system rename failed when loading data into carbon")
- }
+ size += f.getLen
}
}
- sqlContext.sql(makeLoadString(tempCSVFolder, options))
+ size
+ }
+
+ try {
+ cc.sql(makeCreateTableString(dataFrame.schema, options))
+ logInfo(s"temporary CSV file size: ${countSize() / 1024 / 1024} MB")
+ cc.sql(makeLoadString(tableName, tempCSVFolder))
} finally {
fs.delete(tempCSVPath, true)
}
}
- private def csvPackage: String = "com.databricks.spark.csv"
+ private def csvPackage: String = "com.databricks.spark.csv.newapi"
private def convertToCarbonType(sparkType: DataType): String = {
sparkType match {
@@ -101,12 +107,11 @@ class DataFrameFuncs(dataFrame: DataFrame) {
"""
}
- private def makeLoadString(csvFolder: String, option: CarbonOption): String = {
- val tableName = option.tableName
- val dbName = option.dbName
+ private def makeLoadString(tableName: String, csvFolder: String): String = {
s"""
LOAD DATA INPATH '$csvFolder'
- INTO TABLE $dbName.$tableName
+ INTO TABLE $tableName
+ OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}')
"""
}
[2/2] incubator-carbondata git commit: [CARBONDATA-265] Improve
Dataframe write to CarbonData file from CSV file This closes #187
Posted by ch...@apache.org.
[CARBONDATA-265] Improve Dataframe write to CarbonData file from CSV file This closes #187
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/699d0a05
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/699d0a05
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/699d0a05
Branch: refs/heads/master
Commit: 699d0a0505a31d440663c32cef82dbe5623c5d45
Parents: 0be0f30 a03b460
Author: chenliang613 <ch...@apache.org>
Authored: Thu Sep 22 00:22:01 2016 -0700
Committer: chenliang613 <ch...@apache.org>
Committed: Thu Sep 22 00:22:01 2016 -0700
----------------------------------------------------------------------
.../examples/DataFrameAPIExample.scala | 1 +
.../spark/implicit/DataFrameFuncs.scala | 59 +++++++++++---------
2 files changed, 33 insertions(+), 27 deletions(-)
----------------------------------------------------------------------