You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/02/02 06:41:43 UTC
carbondata git commit: [CARBONDATA-2078][CARBONDATA-1516] Add 'if not
exists' for creating datamap
Repository: carbondata
Updated Branches:
refs/heads/master 02eefca15 -> f9606e9d0
[CARBONDATA-2078][CARBONDATA-1516] Add 'if not exists' for creating datamap
Add 'if not exists' function for creating datamap
This closes #1861
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f9606e9d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f9606e9d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f9606e9d
Branch: refs/heads/master
Commit: f9606e9d03d55bf57925c2ac176e92553c213d49
Parents: 02eefca
Author: xubo245 <60...@qq.com>
Authored: Thu Jan 25 21:27:27 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Fri Feb 2 12:08:54 2018 +0530
----------------------------------------------------------------------
.../preaggregate/TestPreAggCreateCommand.scala | 55 ++++++++++-
.../preaggregate/TestPreAggregateLoad.scala | 96 ++++++++++++++++++-
.../timeseries/TestTimeSeriesCreateTable.scala | 85 +++++++++++++----
.../timeseries/TestTimeseriesDataLoad.scala | 99 +++++++++++++++++++-
.../datamap/CarbonCreateDataMapCommand.scala | 38 ++++++--
.../CreatePreAggregateTableCommand.scala | 5 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 9 +-
7 files changed, 353 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index f1d7396..0cb1045 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
import scala.collection.JavaConverters._
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
@@ -321,7 +321,60 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
}
+ test("test pre agg create table 21: should support 'if not exists'") {
+ try {
+ sql(
+ """
+ | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+ | USING 'preaggregate'
+ | AS SELECT
+ | column3,
+ | sum(column3),
+ | column5,
+ | sum(column5)
+ | FROM maintable
+ | GROUP BY column3,column5,column2
+ """.stripMargin)
+
+ sql(
+ """
+ | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+ | USING 'preaggregate'
+ | AS SELECT
+ | column3,
+ | sum(column3),
+ | column5,
+ | sum(column5)
+ | FROM maintable
+ | GROUP BY column3,column5,column2
+ """.stripMargin)
+ assert(true)
+ } catch {
+ case _: Exception =>
+ assert(false)
+ }
+ sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
+ }
+ test("test pre agg create table 22: don't support 'create datamap if exists'") {
+ val e: Exception = intercept[AnalysisException] {
+ sql(
+ """
+ | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable
+ | USING 'preaggregate'
+ | AS SELECT
+ | column3,
+ | sum(column3),
+ | column5,
+ | sum(column5)
+ | FROM maintable
+ | GROUP BY column3,column5,column2
+ """.stripMargin)
+ assert(true)
+ }
+ assert(e.getMessage.contains("identifier matching regex"))
+ sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
+ }
def getCarbontable(plan: LogicalPlan) : CarbonTable ={
var carbonTable : CarbonTable = null
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 4ebf150..b6b7a17 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil4Test
import org.scalatest.{BeforeAndAfterAll, Ignore}
-
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
@@ -310,5 +310,99 @@ test("check load and select for avg double datatype") {
checkAnswer(sql("select name,avg(salary) from maintbl group by name"), rows)
}
+ test("create datamap with 'if not exists' after load data into mainTable and create datamap") {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(
+ s"""
+ | create datamap preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+
+ sql(
+ s"""
+ | create datamap if not exists preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ sql("drop table if exists maintable")
+ }
+
+ test("create datamap with 'if not exists' after create datamap and load data into mainTable") {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+
+ sql(
+ s"""
+ | create datamap preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(
+ s"""
+ | create datamap if not exists preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ sql("drop table if exists maintable")
+ }
+
+ test("create datamap without 'if not exists' after load data into mainTable and create datamap") {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ sql(
+ s"""
+ | create datamap preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+
+ val e: Exception = intercept[TableAlreadyExistsException] {
+ sql(
+ s"""
+ | create datamap preagg_sum
+ | on table maintable
+ | using 'preaggregate'
+ | as select id,sum(age) from maintable
+ | group by id
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("already exists in database"))
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ sql("drop table if exists maintable")
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 0ca7cb9..b63fd53 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -16,6 +16,7 @@
*/
package org.apache.carbondata.integration.spark.testsuite.timeseries
+import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.test.util.QueryTest
import org.scalatest.BeforeAndAfterAll
@@ -81,29 +82,29 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
""".stripMargin)
}
- test("test timeseries create table Zero") {
+ test("test timeseries create table 1") {
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second")
sql("drop datamap agg0_second on table mainTable")
}
- test("test timeseries create table One") {
+ test("test timeseries create table 2") {
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour")
sql("drop datamap agg0_hour on table mainTable")
}
- test("test timeseries create table two") {
+ test("test timeseries create table 3") {
checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day")
sql("drop datamap agg0_day on table mainTable")
}
- test("test timeseries create table three") {
+ test("test timeseries create table 4") {
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month")
sql("drop datamap agg0_month on table mainTable")
}
- test("test timeseries create table four") {
+ test("test timeseries create table 5") {
checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year")
sql("drop datamap agg0_year on table mainTable")
}
- test("test timeseries create table five") {
+ test("test timeseries create table 6") {
intercept[Exception] {
sql(
s"""
@@ -118,7 +119,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
}
}
- test("test timeseries create table Six") {
+ test("test timeseries create table 7") {
intercept[Exception] {
sql(
s"""
@@ -133,7 +134,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
}
}
- test("test timeseries create table seven") {
+ test("test timeseries create table 8") {
intercept[Exception] {
sql(
s"""
@@ -158,7 +159,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
}
}
- test("test timeseries create table Eight") {
+ test("test timeseries create table 9") {
intercept[Exception] {
sql(
s"""
@@ -173,7 +174,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
}
}
- test("test timeseries create table Nine") {
+ test("test timeseries create table 10") {
intercept[Exception] {
sql(
s"""
@@ -188,7 +189,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
}
}
- test("test timeseries create table: USING") {
+ test("test timeseries create table 11: USING") {
val e: Exception = intercept[MalformedDataMapCommandException] {
sql(
"""CREATE DATAMAP agg1 ON TABLE mainTable
@@ -203,7 +204,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Unknown data map type abc"))
}
- test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
+ test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") {
val e: Exception = intercept[MalformedCarbonCommandException] {
sql(
"""CREATE DATAMAP agg1 ON TABLE mainTable
@@ -218,7 +219,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Unknown data map type abc"))
}
- test("test timeseries create table: Only one granularity level can be defined 1") {
+ test("test timeseries create table 13: Only one granularity level can be defined 1") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e: Exception = intercept[MalformedCarbonCommandException] {
sql(
s"""
@@ -238,7 +240,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Only one granularity level can be defined"))
}
- test("test timeseries create table: Only one granularity level can be defined 2") {
+ test("test timeseries create table 14: Only one granularity level can be defined 2") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e: Exception = intercept[MalformedDataMapCommandException] {
sql(
s"""
@@ -255,7 +258,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Only one granularity level can be defined"))
}
- test("test timeseries create table: Only one granularity level can be defined 3") {
+ test("test timeseries create table 15: Only one granularity level can be defined 3") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e: Exception = intercept[MalformedDataMapCommandException] {
sql(
s"""
@@ -272,7 +276,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Only one granularity level can be defined"))
}
- test("test timeseries create table: Granularity only support 1") {
+ test("test timeseries create table 16: Granularity only support 1") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e = intercept[MalformedDataMapCommandException] {
sql(
s"""
@@ -288,7 +293,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Granularity only support 1"))
}
- test("test timeseries create table: Granularity only support 1 and throw Exception") {
+ test("test timeseries create table 17: Granularity only support 1 and throw Exception") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e = intercept[MalformedCarbonCommandException] {
sql(
s"""
@@ -304,7 +310,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals("Granularity only support 1"))
}
- test("test timeseries create table: timeSeries should define time granularity") {
+ test("test timeseries create table 18: timeSeries should define time granularity") {
+ sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
val e = intercept[MalformedDataMapCommandException] {
sql(
s"""
@@ -319,6 +326,48 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
assert(e.getMessage.equals(s"$timeSeries should define time granularity"))
}
+ test("test timeseries create table 19: should support if not exists") {
+ sql("DROP DATAMAP IF EXISTS agg1 ON TABLE mainTable")
+
+ sql(
+ s"""
+ | CREATE DATAMAP agg1 ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='dataTime',
+ | 'MONTH_GRANULARITY'='1')
+ | AS SELECT dataTime, SUM(age) FROM mainTable
+ | GROUP BY dataTime
+ """.stripMargin)
+ sql(
+ s"""
+ | CREATE DATAMAP IF NOT EXISTS agg1 ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='dataTime',
+ | 'MONTH_GRANULARITY'='1')
+ |AS SELECT dataTime, SUM(age) FROM mainTable
+ |GROUP BY dataTime
+ """.stripMargin)
+ checkExistence(sql("SHOW DATAMAP ON TABLE mainTable"), true, "agg1")
+ checkExistence(sql("DESC FORMATTED mainTable_agg1"), true, "maintable_age_sum")
+ }
+
+ test("test timeseries create table 20: don't support 'create datamap if exists'") {
+ val e: Exception = intercept[AnalysisException] {
+ sql(
+ s"""CREATE DATAMAP IF EXISTS agg2 ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='dataTime',
+ | 'MONTH_GRANULARITY'='1')
+ | AS SELECT dataTime, SUM(age) FROM mainTable
+ | GROUP BY dataTime
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("identifier matching regex"))
+ }
+
override def afterAll: Unit = {
sql("DROP TABLE IF EXISTS mainTable")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index 8bcdfc9..b43b93b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -19,9 +19,10 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
import java.sql.Timestamp
import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.test.util.QueryTest
import org.apache.spark.util.SparkUtil4Test
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -239,6 +240,102 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
}
+ test("create datamap without 'if not exists' after load data into mainTable and create datamap") {
+ sql("drop table if exists mainTable")
+ sql(
+ """
+ | CREATE TABLE mainTable(
+ | mytime timestamp,
+ | name string,
+ | age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+
+ sql(
+ s"""
+ | CREATE DATAMAP agg0_second ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'second_granularity'='1')
+ | AS SELECT mytime, SUM(age) FROM mainTable
+ | GROUP BY mytime
+ """.stripMargin)
+
+ checkAnswer(sql("select * from maintable_agg0_second"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+ Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+ Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+ Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+ Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+ Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+ val e: Exception = intercept[TableAlreadyExistsException] {
+ sql(
+ s"""
+ | CREATE DATAMAP agg0_second ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'second_granularity'='1')
+ | AS SELECT mytime, SUM(age) FROM mainTable
+ | GROUP BY mytime
+ """.stripMargin)
+ }
+ assert(e.getMessage.contains("already exists in database"))
+ }
+
+ test("create datamap with 'if not exists' after load data into mainTable and create datamap") {
+ sql("drop table if exists mainTable")
+ sql(
+ """
+ | CREATE TABLE mainTable(
+ | mytime timestamp,
+ | name string,
+ | age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+
+ sql(
+ s"""
+ | CREATE DATAMAP agg0_second ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'second_granularity'='1')
+ | AS SELECT mytime, SUM(age) FROM mainTable
+ | GROUP BY mytime
+ """.stripMargin)
+
+ checkAnswer(sql("select * from maintable_agg0_second"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+ Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+ Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+ Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+ Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+ Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+
+ sql(
+ s"""
+ | CREATE DATAMAP IF NOT EXISTS agg0_second ON TABLE mainTable
+ | USING '$timeSeries'
+ | DMPROPERTIES (
+ | 'EVENT_TIME'='mytime',
+ | 'second_granularity'='1')
+ | AS SELECT mytime, SUM(age) FROM mainTable
+ | GROUP BY mytime
+ """.stripMargin)
+
+ checkAnswer(sql("select * from maintable_agg0_second"),
+ Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+ Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+ Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+ Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+ Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+ Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+ }
+
override def afterAll: Unit = {
sql("drop table if exists mainTable")
sql("drop table if exists table_03")
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c4d32b4..da20ac5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.execution.command.datamap
import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
@@ -35,10 +36,12 @@ case class CarbonCreateDataMapCommand(
tableIdentifier: TableIdentifier,
dmClassName: String,
dmproperties: Map[String, String],
- queryString: Option[String])
+ queryString: Option[String],
+ ifNotExistsSet: Boolean = false)
extends AtomicRunnableCommand {
var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
+ var tableIsExists: Boolean = false
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
// since streaming segment does not support building index and pre-aggregate yet,
@@ -49,10 +52,22 @@ case class CarbonCreateDataMapCommand(
throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
}
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ val dbName = tableIdentifier.database.getOrElse("default")
+ val tableName = tableIdentifier.table + "_" + dataMapName
- if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+ if (sparkSession.sessionState.catalog.listTables(dbName)
+ .exists(_.table.equalsIgnoreCase(tableName))) {
+ LOGGER.audit(
+ s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+ s"Table [$tableName] already exists under database [$dbName]")
+ tableIsExists = true
+ if (!ifNotExistsSet) {
+ throw new TableAlreadyExistsException(dbName, tableName)
+ }
+ } else if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName)
+
createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
val details = TimeSeriesUtil
.getTimeSeriesGranularityDetails(dmproperties, dmClassName)
@@ -62,15 +77,16 @@ case class CarbonCreateDataMapCommand(
dmClassName,
updatedDmProperties,
queryString.get,
- Some(details._1))
+ Some(details._1),
+ ifNotExistsSet = ifNotExistsSet)
} else {
CreatePreAggregateTableCommand(
dataMapName,
tableIdentifier,
dmClassName,
dmproperties,
- queryString.get
- )
+ queryString.get,
+ ifNotExistsSet = ifNotExistsSet)
}
createPreAggregateTableCommands.processMetadata(sparkSession)
} else {
@@ -83,7 +99,11 @@ case class CarbonCreateDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- createPreAggregateTableCommands.processData(sparkSession)
+ if (!tableIsExists) {
+ createPreAggregateTableCommands.processData(sparkSession)
+ } else {
+ Seq.empty
+ }
} else {
throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
}
@@ -92,7 +112,11 @@ case class CarbonCreateDataMapCommand(
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+ if (!tableIsExists) {
+ createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+ } else {
+ Seq.empty
+ }
} else {
throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3de75c2..31a3403 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -49,7 +49,8 @@ case class CreatePreAggregateTableCommand(
dmClassName: String,
dmProperties: Map[String, String],
queryString: String,
- timeSeriesFunction: Option[String] = None)
+ timeSeriesFunction: Option[String] = None,
+ ifNotExistsSet: Boolean = false)
extends AtomicRunnableCommand {
var parentTable: CarbonTable = _
@@ -86,7 +87,7 @@ case class CreatePreAggregateTableCommand(
parentTableIdentifier.database)
// prepare table model of the collected tokens
val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
- ifNotExistPresent = false,
+ ifNotExistPresent = ifNotExistsSet,
new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
tableIdentifier.table.toLowerCase,
fields,
http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4045478..7addd26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -142,17 +142,18 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
/**
* The syntax of datamap creation is as follows.
- * CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName'
+ * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapClassName'
* DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
*/
protected lazy val createDataMap: Parser[LogicalPlan] =
- CREATE ~> DATAMAP ~> ident ~ (ON ~ TABLE) ~ (ident <~ ".").? ~ ident ~
+ CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
+ (ON ~ TABLE) ~ (ident <~ ".").? ~ ident ~
(USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
(AS ~> restInput).? <~ opt(";") ^^ {
- case dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query =>
+ case ifnotexists ~ dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query =>
val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
CarbonCreateDataMapCommand(
- dmname, TableIdentifier(tableName, dbName), className, map, query)
+ dmname, TableIdentifier(tableName, dbName), className, map, query, ifnotexists.isDefined)
}
/**