You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/02/02 06:41:43 UTC

carbondata git commit: [CARBONDATA-2078][CARBONDATA-1516] Add 'if not exists' for creating datamap

Repository: carbondata
Updated Branches:
  refs/heads/master 02eefca15 -> f9606e9d0


[CARBONDATA-2078][CARBONDATA-1516] Add 'if not exists' for creating datamap

Add 'if not exists' function for creating datamap

This closes #1861


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f9606e9d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f9606e9d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f9606e9d

Branch: refs/heads/master
Commit: f9606e9d03d55bf57925c2ac176e92553c213d49
Parents: 02eefca
Author: xubo245 <60...@qq.com>
Authored: Thu Jan 25 21:27:27 2018 +0800
Committer: kunal642 <ku...@gmail.com>
Committed: Fri Feb 2 12:08:54 2018 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 55 ++++++++++-
 .../preaggregate/TestPreAggregateLoad.scala     | 96 ++++++++++++++++++-
 .../timeseries/TestTimeSeriesCreateTable.scala  | 85 +++++++++++++----
 .../timeseries/TestTimeseriesDataLoad.scala     | 99 +++++++++++++++++++-
 .../datamap/CarbonCreateDataMapCommand.scala    | 38 ++++++--
 .../CreatePreAggregateTableCommand.scala        |  5 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |  9 +-
 7 files changed, 353 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index f1d7396..0cb1045 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.hive.CarbonRelation
@@ -321,7 +321,60 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     checkExistence(sql("show tables"), false, "tbl_1_agg2_day","tbl_1_agg2_hour","tbl_1_agg2_month","tbl_1_agg2_year")
   }
 
+  test("test pre agg create table 21: should support 'if not exists'") {
+    try {
+      sql(
+        """
+          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT
+          |   column3,
+          |   sum(column3),
+          |   column5,
+          |   sum(column5)
+          | FROM maintable
+          | GROUP BY column3,column5,column2
+        """.stripMargin)
+
+      sql(
+        """
+          | CREATE DATAMAP IF NOT EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT
+          |   column3,
+          |   sum(column3),
+          |   column5,
+          |   sum(column5)
+          | FROM maintable
+          | GROUP BY column3,column5,column2
+        """.stripMargin)
+      assert(true)
+    } catch {
+      case _: Exception =>
+        assert(false)
+    }
+    sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
+  }
 
+  test("test pre agg create table 22: don't support 'create datamap if exists'") {
+    val e: Exception = intercept[AnalysisException] {
+      sql(
+        """
+          | CREATE DATAMAP IF EXISTS agg0 ON TABLE mainTable
+          | USING 'preaggregate'
+          | AS SELECT
+          |   column3,
+          |   sum(column3),
+          |   column5,
+          |   sum(column5)
+          | FROM maintable
+          | GROUP BY column3,column5,column2
+        """.stripMargin)
+      assert(true)
+    }
+    assert(e.getMessage.contains("identifier matching regex"))
+    sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
+  }
 
   def getCarbontable(plan: LogicalPlan) : CarbonTable ={
     var carbonTable : CarbonTable = null

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
index 4ebf150..b6b7a17 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateLoad.scala
@@ -21,9 +21,9 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
 import org.scalatest.{BeforeAndAfterAll, Ignore}
-
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 
 class TestPreAggregateLoad extends QueryTest with BeforeAndAfterAll {
 
@@ -310,5 +310,99 @@ test("check load and select for avg double datatype") {
     checkAnswer(sql("select name,avg(salary) from maintbl group by name"), rows)
   }
 
+  test("create datamap with 'if not exists' after load data into mainTable and create datamap") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(
+      s"""
+         | create datamap preagg_sum
+         | on table maintable
+         | using 'preaggregate'
+         | as select id,sum(age) from maintable
+         | group by id
+       """.stripMargin)
+
+    sql(
+      s"""
+         | create datamap if not exists preagg_sum
+         | on table maintable
+         | using 'preaggregate'
+         | as select id,sum(age) from maintable
+         | group by id
+       """.stripMargin)
+
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    sql("drop table if exists maintable")
+  }
+
+  test("create datamap with 'if not exists' after create datamap and load data into mainTable") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      s"""
+         | create datamap preagg_sum
+         | on table maintable
+         | using 'preaggregate'
+         | as select id,sum(age) from maintable
+         | group by id
+       """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(
+      s"""
+         | create datamap if not exists preagg_sum
+         | on table maintable
+         | using 'preaggregate'
+         | as select id,sum(age) from maintable
+         | group by id
+       """.stripMargin)
+
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    sql("drop table if exists maintable")
+  }
+
+  test("create datamap without 'if not exists' after load data into mainTable and create datamap") {
+    sql("DROP TABLE IF EXISTS maintable")
+    sql(
+      """
+        | CREATE TABLE maintable(id int, name string, city string, age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+    sql(
+      s"""
+         | create datamap preagg_sum
+         | on table maintable
+         | using 'preaggregate'
+         | as select id,sum(age) from maintable
+         | group by id
+       """.stripMargin)
+
+    val e: Exception = intercept[TableAlreadyExistsException] {
+      sql(
+        s"""
+           | create datamap preagg_sum
+           | on table maintable
+           | using 'preaggregate'
+           | as select id,sum(age) from maintable
+           | group by id
+       """.stripMargin)
+    }
+    assert(e.getMessage.contains("already exists in database"))
+    checkAnswer(sql(s"select * from maintable_preagg_sum"),
+      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    sql("drop table if exists maintable")
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
index 0ca7cb9..b63fd53 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeSeriesCreateTable.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.integration.spark.testsuite.timeseries
 
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -81,29 +82,29 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
        """.stripMargin)
   }
 
-  test("test timeseries create table Zero") {
+  test("test timeseries create table 1") {
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_second"), true, "maintable_agg0_second")
     sql("drop datamap agg0_second on table mainTable")
   }
 
-  test("test timeseries create table One") {
+  test("test timeseries create table 2") {
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_hour"), true, "maintable_agg0_hour")
     sql("drop datamap agg0_hour on table mainTable")
   }
-  test("test timeseries create table two") {
+  test("test timeseries create table 3") {
     checkExistence(sql("DESCRIBE FORMATTED maintable_agg0_day"), true, "maintable_agg0_day")
     sql("drop datamap agg0_day on table mainTable")
   }
-  test("test timeseries create table three") {
+  test("test timeseries create table 4") {
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_month"), true, "maintable_agg0_month")
     sql("drop datamap agg0_month on table mainTable")
   }
-  test("test timeseries create table four") {
+  test("test timeseries create table 5") {
     checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0_year"), true, "maintable_agg0_year")
     sql("drop datamap agg0_year on table mainTable")
   }
 
-  test("test timeseries create table five") {
+  test("test timeseries create table 6") {
     intercept[Exception] {
       sql(
         s"""
@@ -118,7 +119,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test timeseries create table Six") {
+  test("test timeseries create table 7") {
     intercept[Exception] {
       sql(
         s"""
@@ -133,7 +134,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test timeseries create table seven") {
+  test("test timeseries create table 8") {
     intercept[Exception] {
       sql(
         s"""
@@ -158,7 +159,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test timeseries create table Eight") {
+  test("test timeseries create table 9") {
     intercept[Exception] {
       sql(
         s"""
@@ -173,7 +174,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test timeseries create table Nine") {
+  test("test timeseries create table 10") {
     intercept[Exception] {
       sql(
         s"""
@@ -188,7 +189,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test timeseries create table: USING") {
+  test("test timeseries create table 11: USING") {
     val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         """CREATE DATAMAP agg1 ON TABLE mainTable
@@ -203,7 +204,7 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Unknown data map type abc"))
   }
 
-  test("test timeseries create table: USING and catch MalformedCarbonCommandException") {
+  test("test timeseries create table 12: USING and catch MalformedCarbonCommandException") {
     val e: Exception = intercept[MalformedCarbonCommandException] {
       sql(
         """CREATE DATAMAP agg1 ON TABLE mainTable
@@ -218,7 +219,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Unknown data map type abc"))
   }
 
-  test("test timeseries create table: Only one granularity level can be defined 1") {
+  test("test timeseries create table 13: Only one granularity level can be defined 1") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e: Exception = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
@@ -238,7 +240,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 
-  test("test timeseries create table: Only one granularity level can be defined 2") {
+  test("test timeseries create table 14: Only one granularity level can be defined 2") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
@@ -255,7 +258,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 
-  test("test timeseries create table: Only one granularity level can be defined 3") {
+  test("test timeseries create table 15: Only one granularity level can be defined 3") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
@@ -272,7 +276,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Only one granularity level can be defined"))
   }
 
-  test("test timeseries create table: Granularity only support 1") {
+  test("test timeseries create table 16: Granularity only support 1") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
@@ -288,7 +293,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Granularity only support 1"))
   }
 
-  test("test timeseries create table: Granularity only support 1 and throw Exception") {
+  test("test timeseries create table 17: Granularity only support 1 and throw Exception") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e = intercept[MalformedCarbonCommandException] {
       sql(
         s"""
@@ -304,7 +310,8 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals("Granularity only support 1"))
   }
 
-  test("test timeseries create table: timeSeries should define time granularity") {
+  test("test timeseries create table 18: timeSeries should define time granularity") {
+    sql("DROP DATAMAP IF EXISTS agg0_second ON TABLE mainTable")
     val e = intercept[MalformedDataMapCommandException] {
       sql(
         s"""
@@ -319,6 +326,48 @@ class TestTimeSeriesCreateTable extends QueryTest with BeforeAndAfterAll {
     assert(e.getMessage.equals(s"$timeSeries should define time granularity"))
   }
 
+  test("test timeseries create table 19: should support if not exists") {
+    sql("DROP DATAMAP IF EXISTS agg1 ON TABLE mainTable")
+
+    sql(
+      s"""
+         | CREATE DATAMAP agg1 ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='dataTime',
+         |   'MONTH_GRANULARITY'='1')
+         | AS SELECT dataTime, SUM(age) FROM mainTable
+         | GROUP BY dataTime
+        """.stripMargin)
+    sql(
+      s"""
+         | CREATE DATAMAP IF NOT EXISTS agg1 ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='dataTime',
+         |   'MONTH_GRANULARITY'='1')
+         |AS SELECT dataTime, SUM(age) FROM mainTable
+         |GROUP BY dataTime
+        """.stripMargin)
+    checkExistence(sql("SHOW DATAMAP ON TABLE mainTable"), true, "agg1")
+    checkExistence(sql("DESC FORMATTED mainTable_agg1"), true, "maintable_age_sum")
+  }
+
+  test("test timeseries create table 20: don't support 'create datamap if exists'") {
+    val e: Exception = intercept[AnalysisException] {
+      sql(
+        s"""CREATE DATAMAP IF EXISTS agg2 ON TABLE mainTable
+          | USING '$timeSeries'
+          | DMPROPERTIES (
+          |   'EVENT_TIME'='dataTime',
+          |   'MONTH_GRANULARITY'='1')
+          | AS SELECT dataTime, SUM(age) FROM mainTable
+          | GROUP BY dataTime
+        """.stripMargin)
+    }
+    assert(e.getMessage.contains("identifier matching regex"))
+  }
+
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS mainTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
index 8bcdfc9..b43b93b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/timeseries/TestTimeseriesDataLoad.scala
@@ -19,9 +19,10 @@ package org.apache.carbondata.integration.spark.testsuite.timeseries
 import java.sql.Timestamp
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.util.SparkUtil4Test
-import org.scalatest.{BeforeAndAfterAll, Ignore}
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
@@ -239,6 +240,102 @@ class TestTimeseriesDataLoad extends QueryTest with BeforeAndAfterAll {
         Row(Timestamp.valueOf("2016-02-23 01:02:50.0"),50)))
   }
 
+  test("create datamap without 'if not exists' after load data into mainTable and create datamap") {
+    sql("drop table if exists mainTable")
+    sql(
+      """
+        | CREATE TABLE mainTable(
+        |   mytime timestamp,
+        |   name string,
+        |   age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='mytime',
+         |   'second_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+        """.stripMargin)
+
+    checkAnswer(sql("select * from maintable_agg0_second"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+        Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+        Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+        Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+        Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+        Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+    val e: Exception = intercept[TableAlreadyExistsException] {
+      sql(
+        s"""
+           | CREATE DATAMAP agg0_second ON TABLE mainTable
+           | USING '$timeSeries'
+           | DMPROPERTIES (
+           |   'EVENT_TIME'='mytime',
+           |   'second_granularity'='1')
+           | AS SELECT mytime, SUM(age) FROM mainTable
+           | GROUP BY mytime
+        """.stripMargin)
+    }
+    assert(e.getMessage.contains("already exists in database"))
+  }
+
+  test("create datamap with 'if not exists' after load data into mainTable and create datamap") {
+    sql("drop table if exists mainTable")
+    sql(
+      """
+        | CREATE TABLE mainTable(
+        |   mytime timestamp,
+        |   name string,
+        |   age int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/timeseriestest.csv' into table mainTable")
+
+    sql(
+      s"""
+         | CREATE DATAMAP agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='mytime',
+         |   'second_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+        """.stripMargin)
+
+    checkAnswer(sql("select * from maintable_agg0_second"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+        Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+        Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+        Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+        Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+        Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+
+    sql(
+      s"""
+         | CREATE DATAMAP IF NOT EXISTS  agg0_second ON TABLE mainTable
+         | USING '$timeSeries'
+         | DMPROPERTIES (
+         |   'EVENT_TIME'='mytime',
+         |   'second_granularity'='1')
+         | AS SELECT mytime, SUM(age) FROM mainTable
+         | GROUP BY mytime
+        """.stripMargin)
+
+    checkAnswer(sql("select * from maintable_agg0_second"),
+      Seq(Row(Timestamp.valueOf("2016-02-23 01:01:30.0"), 10),
+        Row(Timestamp.valueOf("2016-02-23 01:01:40.0"), 20),
+        Row(Timestamp.valueOf("2016-02-23 01:01:50.0"), 30),
+        Row(Timestamp.valueOf("2016-02-23 01:02:30.0"), 40),
+        Row(Timestamp.valueOf("2016-02-23 01:02:40.0"), 50),
+        Row(Timestamp.valueOf("2016-02-23 01:02:50.0"), 50)))
+  }
+
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
     sql("drop table if exists table_03")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index c4d32b4..da20ac5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -17,6 +17,7 @@
 package org.apache.spark.sql.execution.command.datamap
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.preaaggregate.CreatePreAggregateTableCommand
@@ -35,10 +36,12 @@ case class CarbonCreateDataMapCommand(
     tableIdentifier: TableIdentifier,
     dmClassName: String,
     dmproperties: Map[String, String],
-    queryString: Option[String])
+    queryString: Option[String],
+    ifNotExistsSet: Boolean = false)
   extends AtomicRunnableCommand {
 
   var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
+  var tableIsExists: Boolean = false
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     // since streaming segment does not support building index and pre-aggregate yet,
@@ -49,10 +52,22 @@ case class CarbonCreateDataMapCommand(
       throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
     }
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+    val dbName = tableIdentifier.database.getOrElse("default")
+    val tableName = tableIdentifier.table + "_" + dataMapName
 
-    if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tableName))) {
+      LOGGER.audit(
+        s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+          s"Table [$tableName] already exists under database [$dbName]")
+      tableIsExists = true
+      if (!ifNotExistsSet) {
+        throw new TableAlreadyExistsException(dbName, tableName)
+      }
+    } else if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
       TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName)
+
       createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
         val details = TimeSeriesUtil
           .getTimeSeriesGranularityDetails(dmproperties, dmClassName)
@@ -62,15 +77,16 @@ case class CarbonCreateDataMapCommand(
           dmClassName,
           updatedDmProperties,
           queryString.get,
-          Some(details._1))
+          Some(details._1),
+          ifNotExistsSet = ifNotExistsSet)
       } else {
         CreatePreAggregateTableCommand(
           dataMapName,
           tableIdentifier,
           dmClassName,
           dmproperties,
-          queryString.get
-        )
+          queryString.get,
+          ifNotExistsSet = ifNotExistsSet)
       }
       createPreAggregateTableCommands.processMetadata(sparkSession)
     } else {
@@ -83,7 +99,11 @@ case class CarbonCreateDataMapCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      createPreAggregateTableCommands.processData(sparkSession)
+      if (!tableIsExists) {
+        createPreAggregateTableCommands.processData(sparkSession)
+      } else {
+        Seq.empty
+      }
     } else {
       throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
     }
@@ -92,7 +112,11 @@ case class CarbonCreateDataMapCommand(
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+      if (!tableIsExists) {
+        createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+      } else {
+        Seq.empty
+      }
     } else {
       throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 3de75c2..31a3403 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -49,7 +49,8 @@ case class CreatePreAggregateTableCommand(
     dmClassName: String,
     dmProperties: Map[String, String],
     queryString: String,
-    timeSeriesFunction: Option[String] = None)
+    timeSeriesFunction: Option[String] = None,
+    ifNotExistsSet: Boolean = false)
   extends AtomicRunnableCommand {
 
   var parentTable: CarbonTable = _
@@ -86,7 +87,7 @@ case class CreatePreAggregateTableCommand(
         parentTableIdentifier.database)
     // prepare table model of the collected tokens
     val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
-      ifNotExistPresent = false,
+      ifNotExistPresent = ifNotExistsSet,
       new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
       tableIdentifier.table.toLowerCase,
       fields,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f9606e9d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 4045478..7addd26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -142,17 +142,18 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   /**
    * The syntax of datamap creation is as follows.
-   * CREATE DATAMAP datamapName ON TABLE tableName USING 'DataMapClassName'
+   * CREATE DATAMAP IF NOT EXISTS datamapName ON TABLE tableName USING 'DataMapClassName'
    * DMPROPERTIES('KEY'='VALUE') AS SELECT COUNT(COL1) FROM tableName
    */
   protected lazy val createDataMap: Parser[LogicalPlan] =
-    CREATE ~> DATAMAP ~> ident ~ (ON ~ TABLE) ~  (ident <~ ".").? ~ ident ~
+    CREATE ~> DATAMAP ~> opt(IF ~> NOT ~> EXISTS) ~ ident ~
+    (ON ~ TABLE) ~  (ident <~ ".").? ~ ident ~
     (USING ~> stringLit) ~ (DMPROPERTIES ~> "(" ~> repsep(loadOptions, ",") <~ ")").? ~
     (AS ~> restInput).? <~ opt(";") ^^ {
-      case dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query =>
+      case ifnotexists ~ dmname ~ ontable ~ dbName ~ tableName ~ className ~ dmprops ~ query =>
         val map = dmprops.getOrElse(List[(String, String)]()).toMap[String, String]
         CarbonCreateDataMapCommand(
-          dmname, TableIdentifier(tableName, dbName), className, map, query)
+          dmname, TableIdentifier(tableName, dbName), className, map, query, ifnotexists.isDefined)
     }
 
   /**