You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/25 12:54:40 UTC
[06/10] carbondata git commit: [CARBONDATA-2189] Add DataMapProvider
developer interface
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
deleted file mode 100644
index b2ab977..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.datamap
-
-import java.io.{File, FilenameFilter}
-
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.MetadataProcessException
-import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
-
-class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
-
- val testData = s"$resourcesPath/sample.csv"
-
- override def beforeAll {
- sql("drop table if exists datamaptest")
- sql("drop table if exists datamapshowtest")
- sql("drop table if exists uniqdata")
- sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
- }
-
- val newClass = "org.apache.spark.sql.CarbonSource"
-
- test("test datamap create: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
- }
- }
-
- test("test datamap create with dmproperties: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- }
- }
-
- test("test datamap create with existing name: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql(
- s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
- }
- }
-
- test("test datamap create with preagg") {
- sql("drop datamap if exists datamap3 on table datamaptest")
- sql(
- "create datamap datamap3 on table datamaptest using 'preaggregate' dmproperties('key'='value') as select count(a) from datamaptest")
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 1)
- assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
- assert(dataMapSchemaList.get(0).getProperties.get("key").equals("value"))
- assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
- }
-
- test("check hivemetastore after drop datamap") {
- try {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- "true")
- sql("drop table if exists hiveMetaStoreTable")
- sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
-
- sql(
- "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable")
- checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
-
- sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
- checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
-
- } finally {
- sql("drop table hiveMetaStoreTable")
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
- }
- }
-
- test("drop the table having pre-aggregate") {
- try {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- "true")
- sql("drop table if exists hiveMetaStoreTable_1")
- sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
-
- sql(
- "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' dmproperties('key'='value') as select count(a) from hiveMetaStoreTable_1")
-
- checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
- true,
- "datamap_hiveMetaStoreTable_1")
-
- sql("drop table hiveMetaStoreTable_1")
-
- checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
- } finally {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
- }
- }
-
- test("test datamap create with preagg with duplicate name") {
- sql(
- s"""
- | CREATE DATAMAP datamap10 ON TABLE datamaptest
- | USING 'preaggregate'
- | DMPROPERTIES('key'='value')
- | AS SELECT COUNT(a) FROM datamaptest
- """.stripMargin)
- intercept[MalformedDataMapCommandException] {
- sql(
- s"""
- | CREATE DATAMAP datamap10 ON TABLE datamaptest
- | USING 'preaggregate'
- | DMPROPERTIES('key'='value')
- | AS SELECT COUNT(a) FROM datamaptest
- """.stripMargin)
- }
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
- }
-
- test("test drop non-exist datamap") {
- intercept[NoSuchDataMapException] {
- sql("drop datamap nonexist on table datamaptest")
- }
- val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
- assert(table != null)
- val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
- assert(dataMapSchemaList.size() == 2)
- }
-
- test("test show datamap without preaggregate: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql("drop table if exists datamapshowtest")
- sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
- sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
- sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
- checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
- }
- }
-
- test("test show datamap with preaggregate: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql("drop table if exists datamapshowtest")
- sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
- sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
- sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
- val frame = sql("show datamap on table datamapshowtest")
- assert(frame.collect().length == 2)
- checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
- }
- }
-
- test("test show datamap with no datamap") {
- sql("drop table if exists datamapshowtest")
- sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
- assert(sql("show datamap on table datamapshowtest").collect().length == 0)
- }
-
- test("test show datamap after dropping datamap: don't support using non-exist class") {
- intercept[MetadataProcessException] {
- sql("drop table if exists datamapshowtest")
- sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
- sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
- sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' DMPROPERTIES('key'='value')")
- sql("drop datamap datamap1 on table datamapshowtest")
- val frame = sql("show datamap on table datamapshowtest")
- assert(frame.collect().length == 1)
- checkExistence(frame, true, "datamap2", "(NA)", newClass)
- }
- }
-
- test("test if preaggregate load is successfull for hivemetastore") {
- try {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
- sql("DROP TABLE IF EXISTS maintable")
- sql(
- """
- | CREATE TABLE maintable(id int, name string, city string, age int)
- | STORED BY 'org.apache.carbondata.format'
- """.stripMargin)
- sql(
- s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
-
- .stripMargin)
- sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
- checkAnswer(sql(s"select * from maintable_preagg_sum"),
- Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
- } finally {
- CarbonProperties.getInstance()
- .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
- }
- }
-
- test("test preaggregate load for decimal column for hivemetastore") {
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
- sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
- sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
- sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
- checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
- sql("drop datamap if exists uniqdata_agg on table uniqdata")
- }
-
- test("create pre-agg table with path") {
- sql("drop table if exists main_preagg")
- sql("drop table if exists main ")
- val warehouse = s"$metastoredb/warehouse"
- val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
- sql(
- s"""
- | create table main(
- | year int,
- | month int,
- | name string,
- | salary int)
- | stored by 'carbondata'
- | tblproperties('sort_columns'='month,year,name')
- """.stripMargin)
- sql("insert into main select 10,11,'amy',12")
- sql("insert into main select 10,11,'amy',14")
- sql(
- s"""
- | create datamap preagg
- | on table main
- | using 'preaggregate'
- | dmproperties ('path'='$path')
- | as select name,avg(salary)
- | from main
- | group by name
- """.stripMargin)
- assertResult(true)(new File(path).exists())
- assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
- .list(new FilenameFilter {
- override def accept(dir: File, name: String): Boolean = {
- name.contains(CarbonCommonConstants.FACT_FILE_EXT)
- }
- }).length > 0)
- checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
- checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
- sql("drop datamap preagg on table main")
- assertResult(false)(new File(path).exists())
- sql("drop table main")
- }
-
- override def afterAll {
- sql("DROP TABLE IF EXISTS maintable")
- sql("drop table if exists uniqdata")
- CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
- CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
- sql("drop table if exists datamaptest")
- sql("drop table if exists datamapshowtest")
- }
-}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
new file mode 100644
index 0000000..a6ffa9a
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestIndexDataMapCommand.scala
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datamap
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.MetadataProcessException
+import org.apache.carbondata.common.exceptions.sql.{MalformedDataMapCommandException, NoSuchDataMapException}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestIndexDataMapCommand extends QueryTest with BeforeAndAfterAll {
+
+ val testData = s"$resourcesPath/sample.csv"
+
+ override def beforeAll {
+ sql("drop table if exists datamaptest")
+ sql("drop table if exists datamapshowtest")
+ sql("drop table if exists uniqdata")
+ sql("create table datamaptest (a string, b string, c string) stored by 'carbondata'")
+ }
+
+ val newClass = "org.apache.spark.sql.CarbonSource"
+
+ test("test datamap create: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(s"CREATE DATAMAP datamap1 ON TABLE datamaptest USING '$newClass'")
+ }
+ }
+
+ test("test datamap create with dmproperties: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+ }
+ }
+
+ test("test datamap create with existing name: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql(
+ s"CREATE DATAMAP datamap2 ON TABLE datamaptest USING '$newClass' DMPROPERTIES('key'='value')")
+ }
+ }
+
+ test("test datamap create with preagg") {
+ sql("drop datamap if exists datamap3 on table datamaptest")
+ sql(
+ "create datamap datamap3 on table datamaptest using 'preaggregate' as select count(a) from datamaptest")
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 1)
+ assert(dataMapSchemaList.get(0).getDataMapName.equals("datamap3"))
+ assert(dataMapSchemaList.get(0).getChildSchema.getTableName.equals("datamaptest_datamap3"))
+ }
+
+ test("check hivemetastore after drop datamap") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop table if exists hiveMetaStoreTable")
+ sql("create table hiveMetaStoreTable (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable using 'preaggregate' as select count(a) from hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), true, "datamap_hiveMetaStoreTable")
+
+ sql("drop datamap datamap_hiveMetaStoreTable on table hiveMetaStoreTable")
+ checkExistence(sql("show datamap on table hiveMetaStoreTable"), false, "datamap_hiveMetaStoreTable")
+
+ } finally {
+ sql("drop table hiveMetaStoreTable")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("drop the table having pre-aggregate") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ "true")
+ sql("drop table if exists hiveMetaStoreTable_1")
+ sql("create table hiveMetaStoreTable_1 (a string, b string, c string) stored by 'carbondata'")
+
+ sql(
+ "create datamap datamap_hiveMetaStoreTable_1 on table hiveMetaStoreTable_1 using 'preaggregate' as select count(a) from hiveMetaStoreTable_1")
+
+ checkExistence(sql("show datamap on table hiveMetaStoreTable_1"),
+ true,
+ "datamap_hiveMetaStoreTable_1")
+
+ sql("drop table hiveMetaStoreTable_1")
+
+ checkExistence(sql("show tables"), false, "datamap_hiveMetaStoreTable_1")
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("test datamap create with preagg with duplicate name") {
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | AS SELECT COUNT(a) FROM datamaptest
+ """.stripMargin)
+ intercept[MalformedDataMapCommandException] {
+ sql(
+ s"""
+ | CREATE DATAMAP datamap10 ON TABLE datamaptest
+ | USING 'preaggregate'
+ | AS SELECT COUNT(a) FROM datamaptest
+ """.stripMargin)
+ }
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 2)
+ }
+
+ test("test drop non-exist datamap") {
+ intercept[NoSuchDataMapException] {
+ sql("drop datamap nonexist on table datamaptest")
+ }
+ val table = CarbonMetadata.getInstance().getCarbonTable("default", "datamaptest")
+ assert(table != null)
+ val dataMapSchemaList = table.getTableInfo.getDataMapSchemaList
+ assert(dataMapSchemaList.size() == 2)
+ }
+
+ test("test show datamap without preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql(s"CREATE DATAMAP datamap1 ON TABLE datamapshowtest USING '$newClass' ")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ checkExistence(sql("SHOW DATAMAP ON TABLE datamapshowtest"), true, "datamap1", "datamap2", "(NA)", newClass)
+ }
+ }
+
+ test("test show datamap with preaggregate: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ val frame = sql("show datamap on table datamapshowtest")
+ assert(frame.collect().length == 2)
+ checkExistence(frame, true, "datamap1", "datamap2", "(NA)", newClass, "default.datamapshowtest_datamap1")
+ }
+ }
+
+ test("test show datamap with no datamap") {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ assert(sql("show datamap on table datamapshowtest").collect().length == 0)
+ }
+
+ test("test show datamap after dropping datamap: don't support using non-exist class") {
+ intercept[MetadataProcessException] {
+ sql("drop table if exists datamapshowtest")
+ sql("create table datamapshowtest (a string, b string, c string) stored by 'carbondata'")
+ sql("create datamap datamap1 on table datamapshowtest using 'preaggregate' as select count(a) from datamapshowtest")
+ sql(s"CREATE DATAMAP datamap2 ON TABLE datamapshowtest USING '$newClass' ")
+ sql("drop datamap datamap1 on table datamapshowtest")
+ val frame = sql("show datamap on table datamapshowtest")
+ assert(frame.collect().length == 1)
+ checkExistence(frame, true, "datamap2", "(NA)", newClass)
+ }
+ }
+
+ test("test if preaggregate load is successfull for hivemetastore") {
+ try {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+ sql("DROP TABLE IF EXISTS maintable")
+ sql(
+ """
+ | CREATE TABLE maintable(id int, name string, city string, age int)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ sql(
+ s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+ .stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+ checkAnswer(sql(s"select * from maintable_preagg_sum"),
+ Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+ } finally {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ }
+ }
+
+ test("test preaggregate load for decimal column for hivemetastore") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+ sql("CREATE TABLE uniqdata(CUST_ID int,CUST_NAME String,ACTIVE_EMUI_VERSION string,DOB timestamp,DOJ timestamp, BIGINT_COLUMN1 bigint,BIGINT_COLUMN2 bigint,DECIMAL_COLUMN1 decimal(30,10),DECIMAL_COLUMN2 decimal(36,10),Double_COLUMN1 double, Double_COLUMN2 double,INTEGER_COLUMN1 int) STORED BY 'org.apache.carbondata.format'")
+ sql("insert into uniqdata select 9000,'CUST_NAME_00000','ACTIVE_EMUI_VERSION_00000','1970-01-01 01:00:03','1970-01-01 02:00:03',123372036854,-223372036854,12345678901.1234000000,22345678901.1234000000,11234567489.7976000000,-11234567489.7976000000,1")
+ sql("create datamap uniqdata_agg on table uniqdata using 'preaggregate' as select min(DECIMAL_COLUMN1) from uniqdata group by DECIMAL_COLUMN1")
+ checkAnswer(sql("select * from uniqdata_uniqdata_agg"), Seq(Row(12345678901.1234000000, 12345678901.1234000000)))
+ sql("drop datamap if exists uniqdata_agg on table uniqdata")
+ }
+
+ test("create pre-agg table with path") {
+ sql("drop table if exists main_preagg")
+ sql("drop table if exists main ")
+ val warehouse = s"$metastoredb/warehouse"
+ val path = warehouse + "/" + System.nanoTime + "_preAggTestPath"
+ sql(
+ s"""
+ | create table main(
+ | year int,
+ | month int,
+ | name string,
+ | salary int)
+ | stored by 'carbondata'
+ | tblproperties('sort_columns'='month,year,name')
+ """.stripMargin)
+ sql("insert into main select 10,11,'amy',12")
+ sql("insert into main select 10,11,'amy',14")
+ sql(
+ s"""
+ | create datamap preagg
+ | on table main
+ | using 'preaggregate'
+ | dmproperties ('path'='$path')
+ | as select name,avg(salary)
+ | from main
+ | group by name
+ """.stripMargin)
+ assertResult(true)(new File(path).exists())
+ assertResult(true)(new File(s"${CarbonTablePath.getSegmentPath(path, "0")}")
+ .list(new FilenameFilter {
+ override def accept(dir: File, name: String): Boolean = {
+ name.contains(CarbonCommonConstants.FACT_FILE_EXT)
+ }
+ }).length > 0)
+ checkAnswer(sql("select name,avg(salary) from main group by name"), Row("amy", 13.0))
+ checkAnswer(sql("select * from main_preagg"), Row("amy", 26, 2))
+ sql("drop datamap preagg on table main")
+ assertResult(false)(new File(path).exists())
+ sql("drop table main")
+ }
+
+ override def afterAll {
+ sql("DROP TABLE IF EXISTS maintable")
+ sql("drop table if exists uniqdata")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+ CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+ sql("drop table if exists datamaptest")
+ sql("drop table if exists datamapshowtest")
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
index 84b59e6..248441f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.{DataFrame, SaveMode}
import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
+import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainIndexDataMapFactory, AbstractCoarseGrainIndexDataMap}
+import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
import org.apache.carbondata.core.datastore.page.ColumnPage
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
@@ -47,7 +47,12 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
buildTestData()
// register hook to the table to sleep, thus the other command will be executed
- sql(s"create datamap test on table orders using '${classOf[WaitingDataMap].getName}' as select count(a) from hiveMetaStoreTable_1")
+ sql(
+ s"""
+ | create datamap test on table orders
+ | using '${classOf[WaitingIndexDataMap].getName}'
+ | as select count(a) from hiveMetaStoreTable_1")
+ """.stripMargin)
}
private def buildTestData(): Unit = {
@@ -101,7 +106,7 @@ class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll wit
)
while (!Global.overwriteRunning && count < 1000) {
Thread.sleep(10)
- // to avoid dead loop in case WaitingDataMap is not invoked
+ // to avoid dead loop in case WaitingIndexDataMap is not invoked
count += 1
}
future
@@ -162,7 +167,7 @@ object Global {
var overwriteRunning = false
}
-class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
+class WaitingIndexDataMap() extends AbstractCoarseGrainIndexDataMapFactory {
override def init(identifier: AbsoluteTableIdentifier, dataMapSchema: DataMapSchema): Unit = { }
@@ -172,9 +177,9 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
override def clear(): Unit = {}
- override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainIndexDataMap] = ???
- override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainDataMap] = ???
+ override def getDataMaps(segmentId: String): util.List[AbstractCoarseGrainIndexDataMap] = ???
override def createWriter(segmentId: String, writerPath: String): AbstractDataMapWriter = {
new AbstractDataMapWriter(null, segmentId, writerPath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
new file mode 100644
index 0000000..2b3a306
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapManager.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.PREAGGREGATE;
+import static org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES;
+
+public class DataMapManager {
+
+ private static DataMapManager INSTANCE;
+
+ private DataMapManager() { }
+
+ public static synchronized DataMapManager get() {
+ if (INSTANCE == null) {
+ INSTANCE = new DataMapManager();
+ }
+ return INSTANCE;
+ }
+
+ /**
+ * Return a DataMapProvider instance for specified dataMapSchema.
+ */
+ public DataMapProvider getDataMapProvider(DataMapSchema dataMapSchema) {
+ DataMapProvider provider;
+ if (dataMapSchema.getClassName().equalsIgnoreCase(PREAGGREGATE.toString())) {
+ provider = new PreAggregateDataMapProvider();
+ } else if (dataMapSchema.getClassName().equalsIgnoreCase(TIMESERIES.toString())) {
+ provider = new TimeseriesDataMapProvider();
+ } else {
+ provider = new IndexDataMapProvider();
+ }
+ return provider;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
new file mode 100644
index 0000000..0cf0d04
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProperty.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+
+/**
+ * Property that can be specified when creating DataMap
+ */
+@InterfaceAudience.Internal
+public class DataMapProperty {
+
+ /**
+ * Used to specify the store location of the datamap
+ */
+ public static final String PATH = "path";
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
new file mode 100644
index 0000000..a71e0d8
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/DataMapProvider.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.annotations.InterfaceStability;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.processing.exception.DataLoadingException;
+
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * DataMap is a accelerator for certain type of query. Developer can add new DataMap
+ * implementation to improve query performance.
+ *
+ * Currently two types of DataMap are supported
+ * <ol>
+ * <li> MVDataMap: materialized view type of DataMap to accelerate olap style query,
+ * like SPJG query (select, predicate, join, groupby) </li>
+ * <li> IndexDataMap: index type of DataMap to accelerate filter query </li>
+ * </ol>
+ *
+ * <p>
+ * In following command <br>
+ * {@code CREATE DATAMAP dm ON TABLE main USING 'provider'}, <br>
+ * the <b>provider</b> string can be a short name or class name of the DataMap implementation.
+ *
+ * <br>Currently CarbonData supports following provider:
+ * <ol>
+ * <li> preaggregate: one type of MVDataMap that do pre-aggregate of single table </li>
+ * <li> timeseries: one type of MVDataMap that do pre-aggregate based on time dimension
+ * of the table </li>
+ * <li> class name of {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory}
+ * implementation: Developer can implement new type of IndexDataMap by extending
+ * {@link org.apache.carbondata.core.datamap.dev.IndexDataMapFactory} </li>
+ * </ol>
+ *
+ * @since 1.4.0
+ */
+@InterfaceAudience.Developer("DataMap")
+@InterfaceStability.Unstable
+public interface DataMapProvider {
+
+ /**
+ * Initialize a datamap's metadata.
+ * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+ * Implementation should initialize metadata for datamap, like creating table
+ */
+ void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+ SparkSession sparkSession) throws MalformedDataMapCommandException;
+
+ /**
+ * Initialize a datamap's data.
+ * This is called when user creates datamap, for example "CREATE DATAMAP dm ON TABLE mainTable"
+ * Implementation should initialize data for datamap, like creating data folders
+ */
+ void initData(CarbonTable mainTable, SparkSession sparkSession);
+
+ /**
+ * Opposite operation of {@link #initMeta(CarbonTable, DataMapSchema, String, SparkSession)}.
+ * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+ * Implementation should clean all meta for the datamap
+ */
+ void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
+
+ /**
+ * Opposite operation of {@link #initData(CarbonTable, SparkSession)}.
+ * This is called when user drops datamap, for example "DROP DATAMAP dm ON TABLE mainTable"
+ * Implementation should clean all data for the datamap
+ */
+ void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema, SparkSession sparkSession);
+
+ /**
+ * Rebuild the datamap by loading all existing data from mainTable
+ * This is called when refreshing the datamap when
+ * 1. after datamap creation and if `autoRefreshDataMap` is set to true
+ * 2. user manually trigger refresh datamap command
+ */
+ void rebuild(CarbonTable mainTable, SparkSession sparkSession) throws DataLoadingException;
+
+ /**
+ * Build the datamap incrementally by loading specified segment data
+ * This is called when user manually trigger refresh datamap
+ */
+ void incrementalBuild(CarbonTable mainTable, String[] segmentIds, SparkSession sparkSession)
+ throws DataLoadingException;
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
new file mode 100644
index 0000000..e11e522
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/IndexDataMapProvider.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.MetadataProcessException;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.datamap.DataMapRegistry;
+import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.dev.IndexDataMapFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.format.TableInfo;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil;
+
+@InterfaceAudience.Internal
+public class IndexDataMapProvider implements DataMapProvider {
+
+ private TableInfo originalTableInfo;
+
+ @Override
+ public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+ SparkSession sparkSession) throws MalformedDataMapCommandException {
+ IndexDataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
+ DataMapStoreManager.getInstance().registerDataMap(
+ mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+ originalTableInfo = PreAggregateUtil.updateMainTable(mainTable, dataMapSchema, sparkSession);
+ }
+
+ @Override
+ public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+ // Nothing is needed to do by default
+ }
+
+ @Override
+ public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+ SparkSession sparkSession) {
+ PreAggregateUtil.updateSchemaInfo(mainTable, originalTableInfo, sparkSession);
+ }
+
+ @Override
+ public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+ SparkSession sparkSession) {
+ DataMapStoreManager.getInstance().clearDataMap(
+ mainTable.getAbsoluteTableIdentifier(), dataMapSchema.getDataMapName());
+ }
+
+ @Override
+ public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+ // Nothing is needed to do by default
+ }
+
+ @Override
+ public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+ SparkSession sparkSession) {
+ throw new UnsupportedOperationException();
+ }
+
+ private IndexDataMapFactory createIndexDataMapFactory(DataMapSchema dataMapSchema)
+ throws MalformedDataMapCommandException {
+ IndexDataMapFactory dataMapFactory;
+ try {
+ // try to create DataMapProvider instance by taking providerName as class name
+ Class<? extends IndexDataMapFactory> providerClass =
+ (Class<? extends IndexDataMapFactory>) Class.forName(dataMapSchema.getClassName());
+ dataMapFactory = providerClass.newInstance();
+ } catch (ClassNotFoundException e) {
+ // try to create DataMapProvider instance by taking providerName as short name
+ dataMapFactory = getDataMapFactoryByShortName(dataMapSchema.getClassName());
+ } catch (Throwable e) {
+ throw new MetadataProcessException(
+ "failed to create DataMapProvider '" + dataMapSchema.getClassName() + "'", e);
+ }
+ return dataMapFactory;
+ }
+
+ private IndexDataMapFactory getDataMapFactoryByShortName(String providerName)
+ throws MalformedDataMapCommandException {
+ IndexDataMapFactory dataMapFactory;
+ String className = DataMapRegistry.getDataMapClassName(providerName);
+ if (className != null) {
+ try {
+ Class<? extends IndexDataMapFactory> datamapClass =
+ (Class<? extends IndexDataMapFactory>) Class.forName(providerName);
+ dataMapFactory = datamapClass.newInstance();
+ } catch (ClassNotFoundException ex) {
+ throw new MalformedDataMapCommandException(
+ "DataMap '" + providerName + "' not found", ex);
+ } catch (Throwable ex) {
+ throw new MetadataProcessException(
+ "failed to create DataMap '" + providerName + "'", ex);
+ }
+ } else {
+ throw new MalformedDataMapCommandException(
+ "DataMap '" + providerName + "' not found");
+ }
+ return dataMapFactory;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
new file mode 100644
index 0000000..ac38347
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/PreAggregateDataMapProvider.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand;
+import scala.Some;
+
+@InterfaceAudience.Internal
+public class PreAggregateDataMapProvider implements DataMapProvider {
+ protected PreAggregateTableHelper helper;
+ protected CarbonDropTableCommand dropTableCommand;
+
+ @Override
+ public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+ SparkSession sparkSession) throws MalformedDataMapCommandException {
+ validateDmProperty(dataMapSchema);
+ helper = new PreAggregateTableHelper(
+ mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+ dataMapSchema.getProperties(), ctasSqlStatement, null);
+ helper.initMeta(sparkSession);
+ }
+
+ private void validateDmProperty(DataMapSchema dataMapSchema)
+ throws MalformedDataMapCommandException {
+ if (!dataMapSchema.getProperties().isEmpty()) {
+ if (dataMapSchema.getProperties().size() > 1 ||
+ !dataMapSchema.getProperties().containsKey(DataMapProperty.PATH)) {
+ throw new MalformedDataMapCommandException(
+ "Only 'path' dmproperty is allowed for this datamap");
+ }
+ }
+ }
+
+ @Override
+ public void initData(CarbonTable mainTable, SparkSession sparkSession) {
+ // Nothing is needed to do by default
+ }
+
+ @Override
+ public void freeMeta(CarbonTable mainTable, DataMapSchema dataMapSchema,
+ SparkSession sparkSession) {
+ dropTableCommand = new CarbonDropTableCommand(
+ true,
+ new Some<>(dataMapSchema.getRelationIdentifier().getDatabaseName()),
+ dataMapSchema.getRelationIdentifier().getTableName(),
+ true);
+ dropTableCommand.processMetadata(sparkSession);
+ }
+
+ @Override
+ public void freeData(CarbonTable mainTable, DataMapSchema dataMapSchema,
+ SparkSession sparkSession) {
+ if (dropTableCommand != null) {
+ dropTableCommand.processData(sparkSession);
+ }
+ }
+
+ @Override
+ public void rebuild(CarbonTable mainTable, SparkSession sparkSession) {
+ if (helper != null) {
+ helper.initData(sparkSession);
+ }
+ }
+
+ @Override
+ public void incrementalBuild(CarbonTable mainTable, String[] segmentIds,
+ SparkSession sparkSession) {
+ throw new UnsupportedOperationException();
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
new file mode 100644
index 0000000..510839d
--- /dev/null
+++ b/integration/spark2/src/main/java/org/apache/carbondata/datamap/TimeseriesDataMapProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.datamap;
+
+import java.util.Map;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+
+import org.apache.spark.sql.SparkSession;
+import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateTableHelper;
+import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil;
+import scala.Tuple2;
+
+@InterfaceAudience.Internal
+public class TimeseriesDataMapProvider extends PreAggregateDataMapProvider {
+
+ @Override
+ public void initMeta(CarbonTable mainTable, DataMapSchema dataMapSchema, String ctasSqlStatement,
+ SparkSession sparkSession) {
+ Map<String, String> dmProperties = dataMapSchema.getProperties();
+ String dmProviderName = dataMapSchema.getClassName();
+ TimeSeriesUtil.validateTimeSeriesGranularity(dmProperties, dmProviderName);
+ Tuple2<String, String> details =
+ TimeSeriesUtil.getTimeSeriesGranularityDetails(dmProperties, dmProviderName);
+ dmProperties.remove(details._1());
+ helper = new PreAggregateTableHelper(
+ mainTable, dataMapSchema.getDataMapName(), dataMapSchema.getClassName(),
+ dmProperties, ctasSqlStatement, details._1());
+ helper.initMeta(sparkSession);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 870b1f3..5fb3d56 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -93,7 +93,7 @@ class CarbonEnv {
properties.addProperty(CarbonCommonConstants.STORE_LOCATION, storePath)
}
LOGGER.info(s"carbon env initial: $storePath")
- // trigger event for CarbonEnv init
+ // trigger event for CarbonEnv create
val operationContext = new OperationContext
val carbonEnvInitPreEvent: CarbonEnvInitPreEvent =
CarbonEnvInitPreEvent(sparkSession, storePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 3d3f83b..3f22955 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -21,16 +21,11 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider._
import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
/**
* Below command class will be used to create datamap on table
@@ -44,64 +39,30 @@ case class CarbonCreateDataMapCommand(
queryString: Option[String])
extends AtomicRunnableCommand {
- var createPreAggregateTableCommands: CreatePreAggregateTableCommand = _
+ private var dataMapProvider: DataMapProvider = _
+ private var mainTable: CarbonTable = _
+ private var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
// since streaming segment does not support building index and pre-aggregate yet,
// so streaming table does not support create datamap
- val carbonTable =
- CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
- if (carbonTable.isStreamingTable) {
+ mainTable =
+ CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+ if (mainTable.isStreamingTable) {
throw new MalformedCarbonCommandException("Streaming table does not support creating datamap")
}
- validateDataMapName(carbonTable)
+ validateDataMapName(mainTable)
+ dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+ dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
+ dataMapProvider = DataMapManager.get().getDataMapProvider(dataMapSchema)
+ dataMapProvider.initMeta(mainTable, dataMapSchema, queryString.orNull, sparkSession)
- if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
- dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- TimeSeriesUtil.validateTimeSeriesGranularity(dmproperties, dmClassName)
- createPreAggregateTableCommands = if (dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- val details = TimeSeriesUtil
- .getTimeSeriesGranularityDetails(dmproperties, dmClassName)
- val updatedDmProperties = dmproperties - details._1
- CreatePreAggregateTableCommand(dataMapName,
- tableIdentifier,
- dmClassName,
- updatedDmProperties,
- queryString.get,
- Some(details._1))
- } else {
- CreatePreAggregateTableCommand(
- dataMapName,
- tableIdentifier,
- dmClassName,
- dmproperties,
- queryString.get
- )
- }
- try {
- createPreAggregateTableCommands.processMetadata(sparkSession)
- } catch {
- case e: Throwable => throw new MetadataProcessException(s"Failed to create datamap " +
- s"'$dataMapName'", e)
- }
- } else {
- val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
- dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
- val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
- val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore.lookupRelation(
- Some(dbName),
- tableIdentifier.table)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
- DataMapStoreManager.getInstance().createAndRegisterDataMap(
- carbonTable.getAbsoluteTableIdentifier, dataMapSchema)
- // Save DataMapSchema in the schema file of main table
- PreAggregateUtil.updateMainTable(carbonTable, dataMapSchema, sparkSession)
- }
val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
LOGGER.audit(s"DataMap $dataMapName successfully added to Table ${tableIdentifier.table}")
Seq.empty
}
- private def validateDataMapName(carbonTable: CarbonTable) = {
+ private def validateDataMapName(carbonTable: CarbonTable): Unit = {
val existingDataMaps = carbonTable.getTableInfo.getDataMapSchemaList
existingDataMaps.asScala.foreach { dataMapSchema =>
if (dataMapSchema.getDataMapName.equalsIgnoreCase(dataMapName)) {
@@ -111,18 +72,19 @@ case class CarbonCreateDataMapCommand(
}
override def processData(sparkSession: SparkSession): Seq[Row] = {
- if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
- dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- createPreAggregateTableCommands.processData(sparkSession)
- } else {
- Seq.empty
+ if (dataMapProvider != null) {
+ dataMapProvider.initData(mainTable, sparkSession)
+ if (mainTable.isAutoRefreshDataMap) {
+ dataMapProvider.rebuild(mainTable, sparkSession)
+ }
}
+ Seq.empty
}
override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
- if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
- dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
- createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
+ if (dataMapProvider != null) {
+ dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+ Seq.empty
} else {
throw new MalformedDataMapCommandException("Unknown data map type " + dmClassName)
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index f410f52..4cfc6b4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -25,16 +25,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.execution.command.AtomicRunnableCommand
import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
-import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
import org.apache.carbondata.common.exceptions.MetadataProcessException
import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, NoSuchDataMapException}
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.core.datamap.DataMapStoreManager
import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.datamap.{DataMapManager, DataMapProvider}
import org.apache.carbondata.events._
/**
@@ -51,7 +50,9 @@ case class CarbonDropDataMapCommand(
tableName: String)
extends AtomicRunnableCommand {
- var commandToRun: CarbonDropTableCommand = _
+ private var dataMapProvider: DataMapProvider = _
+ private var mainTable: CarbonTable = _
+ private var dataMapSchema: DataMapSchema = _
override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -76,44 +77,42 @@ case class CarbonDropDataMapCommand(
throw new MetadataProcessException(s"Dropping datamap $dataMapName failed", ex)
}
if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
- val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
+ mainTable = carbonTable.get
+ val dataMapSchemaOp = mainTable.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
- if (dataMapSchema.isDefined) {
+ if (dataMapSchemaOp.isDefined) {
+ dataMapSchema = dataMapSchemaOp.get._1
val operationContext = new OperationContext
val dropDataMapPreEvent =
DropDataMapPreEvent(
- Some(dataMapSchema.get._1),
+ Some(dataMapSchema),
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext)
- carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
+ mainTable.getTableInfo.getDataMapSchemaList.remove(dataMapSchemaOp.get._2)
val schemaConverter = new ThriftWrapperSchemaConverterImpl
PreAggregateUtil.updateSchemaInfo(
- carbonTable.get,
+ mainTable,
schemaConverter.fromWrapperToExternalTableInfo(
- carbonTable.get.getTableInfo,
+ mainTable.getTableInfo,
dbName,
tableName))(sparkSession)
- commandToRun = CarbonDropTableCommand(
- ifExistsSet = true,
- Some(dataMapSchema.get._1.getRelationIdentifier.getDatabaseName),
- dataMapSchema.get._1.getRelationIdentifier.getTableName,
- dropChildTable = true
- )
- commandToRun.processMetadata(sparkSession)
+ dataMapProvider = DataMapManager.get.getDataMapProvider(dataMapSchema)
+ dataMapProvider.freeMeta(mainTable, dataMapSchema, sparkSession)
+
// fires the event after dropping datamap from main table schema
val dropDataMapPostEvent =
DropDataMapPostEvent(
- Some(dataMapSchema.get._1),
+ Some(dataMapSchema),
ifExistsSet,
sparkSession)
OperationListenerBus.getInstance.fireEvent(dropDataMapPostEvent, operationContext)
} else if (!ifExistsSet) {
throw new NoSuchDataMapException(dataMapName, tableName)
}
- } else if ((carbonTable.isDefined &&
- carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) {
+ } else if (carbonTable.isDefined &&
+ carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
if (!ifExistsSet) {
throw new NoSuchDataMapException(dataMapName, tableName)
}
@@ -140,10 +139,8 @@ case class CarbonDropDataMapCommand(
override def processData(sparkSession: SparkSession): Seq[Row] = {
// delete the table folder
- if (commandToRun != null) {
- DataMapStoreManager.getInstance().clearDataMap(
- commandToRun.carbonTable.getAbsoluteTableIdentifier, dataMapName)
- commandToRun.processData(sparkSession)
+ if (dataMapProvider != null) {
+ dataMapProvider.freeData(mainTable, dataMapSchema, sparkSession)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index b53c609..ecf6f99 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -97,7 +97,7 @@ case class CarbonAlterTableDropPartitionCommand(
partitionInfo.dropPartition(partitionIndex)
// read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
dbName, tableName, tablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 84779cc..732178c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -89,7 +89,7 @@ case class CarbonAlterTableSplitPartitionCommand(
updatePartitionInfo(partitionInfo, partitionIds)
// read TableInfo
- val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)(sparkSession)
+ val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTable)
val schemaConverter = new ThriftWrapperSchemaConverterImpl()
val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(tableInfo,
dbName, tableName, tablePath)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2117c077/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
deleted file mode 100644
index 6d4822b..0000000
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ /dev/null
@@ -1,218 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.command.preaaggregate
-
-import scala.collection.JavaConverters._
-import scala.collection.mutable
-
-import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
-import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
-import org.apache.spark.sql.parser.CarbonSpark2SqlParser
-
-import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-
-/**
- * Below command class will be used to create pre-aggregate table
- * and updating the parent table about the child table information
- * It will be either success or nothing happen in case of failure:
- * 1. failed to create pre aggregate table.
- * 2. failed to update main table
- *
- */
-case class CreatePreAggregateTableCommand(
- dataMapName: String,
- parentTableIdentifier: TableIdentifier,
- dmClassName: String,
- dmProperties: Map[String, String],
- queryString: String,
- timeSeriesFunction: Option[String] = None)
- extends AtomicRunnableCommand {
-
- var parentTable: CarbonTable = _
- var loadCommand: CarbonLoadDataCommand = _
-
- override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
- val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
- val df = sparkSession.sql(updatedQuery)
- val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
- df.logicalPlan, queryString)
- val fields = fieldRelationMap.keySet.toSeq
- val tableProperties = mutable.Map[String, String]()
- dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-
- parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
- if (!parentTable.getTableName.equalsIgnoreCase(parentTableIdentifier.table)) {
- throw new MalformedDataMapCommandException(
- "Parent table name is different in select and create")
- }
- var neworder = Seq[String]()
- val parentOrder = parentTable.getSortColumns(parentTable.getTableName).asScala
- parentOrder.foreach(parentcol =>
- fields.filter(col => (fieldRelationMap.get(col).get.aggregateFunction.isEmpty) &&
- (parentcol.equals(fieldRelationMap.get(col).get.
- columnTableRelationList.get(0).parentColumnName)))
- .map(cols => neworder :+= cols.column)
- )
- tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
- tableProperties.put("sort_scope", parentTable.getTableInfo.getFactTable.
- getTableProperties.getOrDefault("sort_scope", CarbonCommonConstants
- .LOAD_SORT_SCOPE_DEFAULT))
- tableProperties
- .put(CarbonCommonConstants.TABLE_BLOCKSIZE, parentTable.getBlockSizeInMB.toString)
- val tableIdentifier =
- TableIdentifier(parentTableIdentifier.table + "_" + dataMapName,
- parentTableIdentifier.database)
- // prepare table model of the collected tokens
- val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
- ifNotExistPresent = false,
- new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
- tableIdentifier.table.toLowerCase,
- fields,
- Seq(),
- tableProperties,
- None,
- isAlterFlow = false,
- None)
-
-
- // updating the relation identifier, this will be stored in child table
- // which can be used during dropping of pre-aggreate table as parent table will
- // also get updated
- if(timeSeriesFunction.isDefined) {
- TimeSeriesUtil.validateTimeSeriesEventTime(dmProperties, parentTable)
- TimeSeriesUtil.validateEventTimeColumnExitsInSelect(
- fieldRelationMap,
- dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get)
- TimeSeriesUtil.updateTimeColumnSelect(fieldRelationMap,
- dmProperties.get(TimeSeriesUtil.TIMESERIES_EVENTTIME).get,
- timeSeriesFunction.get)
- }
- tableModel.parentTable = Some(parentTable)
- tableModel.dataMapRelation = Some(fieldRelationMap)
- val tablePath = if (dmProperties.contains("path")) {
- dmProperties("path")
- } else {
- CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
- }
- CarbonCreateTableCommand(TableNewProcessor(tableModel),
- tableModel.ifNotExistsSet, Some(tablePath)).run(sparkSession)
-
- val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
- val tableInfo = table.getTableInfo
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable.buildChildSchema(
- dataMapName,
- CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
- tableInfo.getDatabaseName,
- queryString,
- "AGGREGATION")
- dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
-
- // updating the parent table about child table
- PreAggregateUtil.updateMainTable(
- parentTable,
- childSchema,
- sparkSession)
- // After updating the parent carbon table with data map entry extract the latest table object
- // to be used in further create process.
- parentTable = CarbonEnv.getCarbonTable(parentTableIdentifier.database,
- parentTableIdentifier.table)(sparkSession)
- val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
- val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
- .filter(p => p.getDataMapName
- .equalsIgnoreCase(dataMapName)).head
- .asInstanceOf[AggregationDataMapSchema]
- PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
- parentTable.getTableName,
- parentTable.getDatabaseName)
- } else {
- queryString
- }
- val dataFrame = sparkSession.sql(new CarbonSpark2SqlParser().addPreAggLoadFunction(
- updatedLoadQuery)).drop("preAggLoad")
- val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
- .filter(dataMap => dataMap.getDataMapName.equalsIgnoreCase(dataMapName)).head
- .asInstanceOf[AggregationDataMapSchema]
- loadCommand = PreAggregateUtil.createLoadCommandForChild(
- dataMap.getChildSchema.getListOfColumns,
- tableIdentifier,
- dataFrame,
- false,
- sparkSession = sparkSession)
- loadCommand.processMetadata(sparkSession)
- Seq.empty
- }
-
- override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
- // drop child table and undo the change in table info of main table
- CarbonDropDataMapCommand(
- dataMapName,
- ifExistsSet = true,
- parentTableIdentifier.database,
- parentTableIdentifier.table).run(sparkSession)
- Seq.empty
- }
-
- override def processData(sparkSession: SparkSession): Seq[Row] = {
- // load child table if parent table has existing segments
- // This will be used to check if the parent table has any segments or not. If not then no
- // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
- // table.
- SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false)
- val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
- if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
- load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
- throw new UnsupportedOperationException(
- "Cannot create pre-aggregate table when insert is in progress on main table")
- } else if (loadAvailable.nonEmpty) {
- val updatedQuery = if (timeSeriesFunction.isDefined) {
- val dataMap = parentTable.getTableInfo.getDataMapSchemaList.asScala
- .filter(p => p.getDataMapName
- .equalsIgnoreCase(dataMapName)).head
- .asInstanceOf[AggregationDataMapSchema]
- PreAggregateUtil.createTimeSeriesSelectQueryFromMain(dataMap.getChildSchema,
- parentTable.getTableName,
- parentTable.getDatabaseName)
- } else {
- queryString
- }
- // Passing segmentToLoad as * because we want to load all the segments into the
- // pre-aggregate table even if the user has set some segments on the parent table.
- loadCommand.dataFrame = Some(PreAggregateUtil
- .getDataFrame(sparkSession, loadCommand.logicalPlan.get))
- PreAggregateUtil.startDataLoadForDataMap(
- parentTable,
- segmentToLoad = "*",
- validateSegments = true,
- sparkSession,
- loadCommand)
- }
- Seq.empty
- }
-}
-
-