You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:29 UTC
[33/50] [abbrv] carbondata git commit: [CARBONDATA-1924][PARTITION]
Restrict streaming on Partitioned table and support PARTITION syntax to the
LOAD TABLE command.
[CARBONDATA-1924][PARTITION] Restrict streaming on Partitioned table and support PARTITION syntax to the LOAD TABLE command.
This closes #1699
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/577a8b0d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/577a8b0d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/577a8b0d
Branch: refs/heads/fgdatamap
Commit: 577a8b0d59d760a35626779fe45dd4d91c6b3328
Parents: a89587e
Author: ravipesala <ra...@gmail.com>
Authored: Tue Dec 19 20:01:30 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Dec 21 18:46:50 2017 +0530
----------------------------------------------------------------------
.../StandardPartitionTableLoadingTestCase.scala | 57 ++++++++++++++++++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 6 +++
.../management/CarbonLoadDataCommand.scala | 10 +++-
.../sql/parser/CarbonSpark2SqlParser.scala | 22 +++++---
.../spark/sql/parser/CarbonSparkSqlParser.scala | 3 ++
5 files changed, 89 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 11e95d8..b7010e5 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -17,6 +17,7 @@
package org.apache.carbondata.spark.testsuite.standardpartition
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{AnalysisException, Row}
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -200,6 +201,59 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
validateDataFiles("default_singlepasspartitionone", "0", 1)
}
+ test("data loading for partition table for one static partition column with load syntax") {
+ sql(
+ """
+ | CREATE TABLE loadstaticpartitionone (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitionone PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select distinct empno from loadstaticpartitionone"), Seq(Row(1)))
+ }
+
+ test("overwrite partition table for one static partition column with load syntax") {
+ sql(
+ """
+ | CREATE TABLE loadstaticpartitiononeoverwrite (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ val rows = sql("select count(*) from loadstaticpartitiononeoverwrite").collect()
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE loadstaticpartitiononeoverwrite PARTITION(empno='1') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+ checkAnswer(sql("select count(*) from loadstaticpartitiononeoverwrite"), rows)
+ }
+
+ test("Restrict streaming on partitioned table") {
+ intercept[AnalysisException] {
+ sql(
+ """
+ | CREATE TABLE streamingpartitionedtable (empname String, designation String, doj
+ | Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('streaming'='true')
+ """.stripMargin)
+ }
+ }
+
override def afterAll = {
dropTable
@@ -215,6 +269,9 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
sql("drop table if exists insertpartitionthree")
sql("drop table if exists staticpartitionone")
sql("drop table if exists singlepasspartitionone")
+ sql("drop table if exists loadstaticpartitionone")
+ sql("drop table if exists loadstaticpartitiononeoverwrite")
+ sql("drop table if exists streamingpartitionedtable")
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 6e9b36c..129e6b3 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -978,6 +978,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
case _ => ("", "")
}
+ protected lazy val partitions: Parser[(String, String)] =
+ (ident <~ "=") ~ stringLit ^^ {
+ case opt ~ optvalue => (opt.trim, optvalue)
+ case _ => ("", "")
+ }
+
protected lazy val valueOptions: Parser[(Int, Int)] =
(numericLit <~ ",") ~ numericLit ^^ {
case opt ~ optvalue => (opt.toInt, optvalue.toInt)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 7492951..f96c0a7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -558,8 +558,14 @@ case class CarbonLoadDataCommand(
}
// Only select the required columns
- Project(relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get),
- LogicalRDD(attributes, rdd)(sparkSession))
+ val output = if (partition.nonEmpty) {
+ relation.output.map{ attr =>
+ attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+ }.filter(attr => partition.get(attr.name).isEmpty)
+ } else {
+ relation.output.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+ }
+ Project(output, LogicalRDD(attributes, rdd)(sparkSession))
}
val convertRelation = relation match {
case l: LogicalRelation =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index dad0e3e..5d00a0c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -351,8 +351,9 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
protected lazy val loadDataNew: Parser[LogicalPlan] =
LOAD ~> DATA ~> opt(LOCAL) ~> INPATH ~> stringLit ~ opt(OVERWRITE) ~
(INTO ~> TABLE ~> (ident <~ ".").? ~ ident) ~
+ (PARTITION ~>"("~> repsep(partitions, ",") <~ ")").? ~
(OPTIONS ~> "(" ~> repsep(loadOptions, ",") <~ ")").? <~ opt(";") ^^ {
- case filePath ~ isOverwrite ~ table ~ optionsList =>
+ case filePath ~ isOverwrite ~ table ~ partitions ~ optionsList =>
val (databaseNameOp, tableName) = table match {
case databaseName ~ tableName => (databaseName, tableName.toLowerCase())
}
@@ -360,13 +361,20 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
validateOptions(optionsList)
}
val optionsMap = optionsList.getOrElse(List.empty[(String, String)]).toMap
+ val partitionSpec = partitions.getOrElse(List.empty[(String, String)]).toMap
CarbonLoadDataCommand(
- convertDbNameToLowerCase(databaseNameOp),
- tableName,
- filePath,
- Seq(),
- optionsMap,
- isOverwrite.isDefined)
+ databaseNameOp = convertDbNameToLowerCase(databaseNameOp),
+ tableName = tableName,
+ factPathFromUser = filePath,
+ dimFilesPath = Seq(),
+ options = optionsMap,
+ isOverwriteTable = isOverwrite.isDefined,
+ inputSqlString = null,
+ dataFrame = None,
+ updateModel = None,
+ tableInfoOp = None,
+ internalOptions = Map.empty,
+ partition = partitionSpec.map { case (key, value) => (key, Some(value))})
}
protected lazy val deleteLoadsByID: Parser[LogicalPlan] =
http://git-wip-us.apache.org/repos/asf/carbondata/blob/577a8b0d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 3597208..211e0ef 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -233,6 +233,9 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
case _ =>
// ignore this case
}
+ if (partitionFields.nonEmpty && options.isStreaming) {
+ operationNotAllowed("Streaming is not allowed on partitioned table", partitionColumns)
+ }
// validate tblProperties
val bucketFields = parser.getBucketFields(tableProperties, fields, options)
// prepare table model of the collected tokens