You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/13 09:24:05 UTC
carbondata git commit: [CARBONDATA-1709][DataFrame] Support
sort_columns option in dataframe writer
Repository: carbondata
Updated Branches:
refs/heads/master 63afc00f6 -> 9c038543e
[CARBONDATA-1709][DataFrame] Support sort_columns option in dataframe writer
This PR adds SORT_COLUMNS option support in dataframe writer
This closes #1496
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c038543
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c038543
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c038543
Branch: refs/heads/master
Commit: 9c038543e2ce68a220ad9967acf59bd8f23b6ae0
Parents: 63afc00
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Nov 14 20:23:35 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 13 17:23:48 2017 +0800
----------------------------------------------------------------------
.../testsuite/dataload/TestLoadDataFrame.scala | 63 +++++++++++++++++++-
.../apache/carbondata/spark/CarbonOption.scala | 2 +
.../spark/sql/CarbonDataFrameWriter.scala | 1 +
3 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 57c5204..574eb91 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -22,7 +22,7 @@ import java.math.BigDecimal
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, Row, SaveMode}
import org.scalatest.BeforeAndAfterAll
class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
@@ -73,7 +73,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE IF EXISTS carbon8")
sql("DROP TABLE IF EXISTS carbon9")
sql("DROP TABLE IF EXISTS carbon10")
-
+ sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
+ sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
+ sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
}
@@ -236,13 +238,68 @@ test("test the boolean data type"){
sql("select count(*) from carbon10 where c3 > 500"), Row(500)
)
sql("drop table carbon10")
- assert(! new File(path).exists())
+ assert(!new File(path).exists())
assert(intercept[AnalysisException](
sql("select count(*) from carbon10 where c3 > 500"))
.message
.contains("not found"))
}
+ private def getSortColumnValue(tableName: String): Array[String] = {
+ val desc = sql(s"desc formatted $tableName")
+ val sortColumnRow = desc.collect.find(r =>
+ r(0).asInstanceOf[String].trim.equalsIgnoreCase("SORT_COLUMNS")
+ )
+ assert(sortColumnRow.isDefined)
+ sortColumnRow.get.get(1).asInstanceOf[String].split(",")
+ .map(_.trim.toLowerCase).filter(_.length > 0)
+ }
+
+ private def getDefaultWriter(tableName: String): DataFrameWriter[Row] = {
+ df2.write
+ .format("carbondata")
+ .option("tableName", tableName)
+ .option("tempCSV", "false")
+ .option("single_pass", "false")
+ .option("table_blocksize", "256")
+ .option("compress", "false")
+ .mode(SaveMode.Overwrite)
+ }
+
+ test("test load dataframe with sort_columns not specified," +
+ " by default all string columns will be sort_columns") {
+ // all string column will be sort_columns by default
+ getDefaultWriter("df_write_sort_column_not_specified").save()
+ checkAnswer(
+ sql("select count(*) from df_write_sort_column_not_specified where c3 > 500"), Row(500)
+ )
+
+ val sortColumnValue = getSortColumnValue("df_write_sort_column_not_specified")
+ assert(sortColumnValue.sameElements(Array("c1", "c2")))
+ }
+
+ test("test load dataframe with sort_columns specified") {
+ // only specify c1 as sort_columns
+ getDefaultWriter("df_write_specify_sort_column").option("sort_columns", "c1").save()
+ checkAnswer(
+ sql("select count(*) from df_write_specify_sort_column where c3 > 500"), Row(500)
+ )
+
+ val sortColumnValue = getSortColumnValue("df_write_specify_sort_column")
+ assert(sortColumnValue.sameElements(Array("c1")))
+ }
+
+ test("test load dataframe with sort_columns specified empty") {
+ // specify empty sort_column
+ getDefaultWriter("df_write_empty_sort_column").option("sort_columns", "").save()
+ checkAnswer(
+ sql("select count(*) from df_write_empty_sort_column where c3 > 500"), Row(500)
+ )
+
+ val sortColumnValue = getSortColumnValue("df_write_empty_sort_column")
+ assert(sortColumnValue.isEmpty)
+ }
+
override def afterAll {
dropTable
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 594ea0e..bcdad26 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -42,6 +42,8 @@ class CarbonOption(options: Map[String, String]) {
def singlePass: Boolean = options.getOrElse("single_pass", "false").toBoolean
+ def sortColumns: Option[String] = options.get("sort_columns")
+
def dictionaryInclude: Option[String] = options.get("dictionary_include")
def dictionaryExclude: Option[String] = options.get("dictionary_exclude")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index ca371e1..d50f0b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -168,6 +168,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
s"${ field.name } ${ convertToCarbonType(field.dataType) }"
}
val property = Map(
+ "SORT_COLUMNS" -> options.sortColumns,
"DICTIONARY_INCLUDE" -> options.dictionaryInclude,
"DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
"TABLE_BLOCKSIZE" -> options.tableBlockSize