You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/13 09:24:05 UTC

carbondata git commit: [CARBONDATA-1709][DataFrame] Support sort_columns option in dataframe writer

Repository: carbondata
Updated Branches:
  refs/heads/master 63afc00f6 -> 9c038543e


[CARBONDATA-1709][DataFrame] Support sort_columns option in dataframe writer

This PR adds SORT_COLUMNS option support in dataframe writer

This closes #1496


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9c038543
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9c038543
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9c038543

Branch: refs/heads/master
Commit: 9c038543e2ce68a220ad9967acf59bd8f23b6ae0
Parents: 63afc00
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Tue Nov 14 20:23:35 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Dec 13 17:23:48 2017 +0800

----------------------------------------------------------------------
 .../testsuite/dataload/TestLoadDataFrame.scala  | 63 +++++++++++++++++++-
 .../apache/carbondata/spark/CarbonOption.scala  |  2 +
 .../spark/sql/CarbonDataFrameWriter.scala       |  1 +
 3 files changed, 63 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 57c5204..574eb91 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -22,7 +22,7 @@ import java.math.BigDecimal
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, DataFrameWriter, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
@@ -73,7 +73,9 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE IF EXISTS carbon8")
     sql("DROP TABLE IF EXISTS carbon9")
     sql("DROP TABLE IF EXISTS carbon10")
-
+    sql("DROP TABLE IF EXISTS df_write_sort_column_not_specified")
+    sql("DROP TABLE IF EXISTS df_write_specify_sort_column")
+    sql("DROP TABLE IF EXISTS df_write_empty_sort_column")
   }
 
 
@@ -236,13 +238,68 @@ test("test the boolean data type"){
       sql("select count(*) from carbon10 where c3 > 500"), Row(500)
     )
     sql("drop table carbon10")
-    assert(! new File(path).exists())
+    assert(!new File(path).exists())
     assert(intercept[AnalysisException](
       sql("select count(*) from carbon10 where c3 > 500"))
       .message
       .contains("not found"))
   }
 
+  private def getSortColumnValue(tableName: String): Array[String] = {
+    val desc = sql(s"desc formatted $tableName")
+    val sortColumnRow = desc.collect.find(r =>
+      r(0).asInstanceOf[String].trim.equalsIgnoreCase("SORT_COLUMNS")
+    )
+    assert(sortColumnRow.isDefined)
+    sortColumnRow.get.get(1).asInstanceOf[String].split(",")
+      .map(_.trim.toLowerCase).filter(_.length > 0)
+  }
+
+  private def getDefaultWriter(tableName: String): DataFrameWriter[Row] = {
+    df2.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "false")
+      .option("single_pass", "false")
+      .option("table_blocksize", "256")
+      .option("compress", "false")
+      .mode(SaveMode.Overwrite)
+  }
+
+  test("test load dataframe with sort_columns not specified," +
+       " by default all string columns will be sort_columns") {
+    // all string column will be sort_columns by default
+    getDefaultWriter("df_write_sort_column_not_specified").save()
+    checkAnswer(
+      sql("select count(*) from df_write_sort_column_not_specified where c3 > 500"), Row(500)
+    )
+
+    val sortColumnValue = getSortColumnValue("df_write_sort_column_not_specified")
+    assert(sortColumnValue.sameElements(Array("c1", "c2")))
+  }
+
+  test("test load dataframe with sort_columns specified") {
+    // only specify c1 as sort_columns
+    getDefaultWriter("df_write_specify_sort_column").option("sort_columns", "c1").save()
+    checkAnswer(
+      sql("select count(*) from df_write_specify_sort_column where c3 > 500"), Row(500)
+    )
+
+    val sortColumnValue = getSortColumnValue("df_write_specify_sort_column")
+    assert(sortColumnValue.sameElements(Array("c1")))
+  }
+
+  test("test load dataframe with sort_columns specified empty") {
+    // specify empty sort_column
+    getDefaultWriter("df_write_empty_sort_column").option("sort_columns", "").save()
+    checkAnswer(
+      sql("select count(*) from df_write_empty_sort_column where c3 > 500"), Row(500)
+    )
+
+    val sortColumnValue = getSortColumnValue("df_write_empty_sort_column")
+    assert(sortColumnValue.isEmpty)
+  }
+
   override def afterAll {
     dropTable
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 594ea0e..bcdad26 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -42,6 +42,8 @@ class CarbonOption(options: Map[String, String]) {
 
   def singlePass: Boolean = options.getOrElse("single_pass", "false").toBoolean
 
+  def sortColumns: Option[String] = options.get("sort_columns")
+
   def dictionaryInclude: Option[String] = options.get("dictionary_include")
 
   def dictionaryExclude: Option[String] = options.get("dictionary_exclude")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9c038543/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index ca371e1..d50f0b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -168,6 +168,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       s"${ field.name } ${ convertToCarbonType(field.dataType) }"
     }
     val property = Map(
+      "SORT_COLUMNS" -> options.sortColumns,
       "DICTIONARY_INCLUDE" -> options.dictionaryInclude,
       "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
       "TABLE_BLOCKSIZE" -> options.tableBlockSize