You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by ravipesala <gi...@git.apache.org> on 2017/11/10 11:03:21 UTC
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
GitHub user ravipesala opened a pull request:
https://github.com/apache/carbondata/pull/1481
[CARBONDATA-1576] Added create datamap parser and saved to schema file
Be sure to do all of the following checklist to help us incorporate
your contribution quickly and easily:
User can create datamap using the following syntax.
```
CREATE DATAMAP agg_sales
ON TABLE sales
USING "org.apache.carbondata.datamap.AggregateDataMapHandler"
DMPROPERTIES (
'KEY’=’VALUE’
) AS
SELECT order_time, count(user_id) FROM sales GROUP BY order_time
```
In the above syntax `DMPROPERTIES` and `AS QUERY` are optional.
- [X] Any interfaces changed?
NO
- [X] Any backward compatibility impacted?
NO
- [X] Document update required?
Yes, need to add new syntax to document
- [X] Testing done
Testcases are added
- [X] For large changes, please consider breaking it into sub-tasks under an umbrella JIRA.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/ravipesala/incubator-carbondata datamap-parser1
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/carbondata/pull/1481.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1481
----
commit 75faa7755fa3c493ea7cf9882187dca9425373b4
Author: ravipesala <ra...@gmail.com>
Date: 2017-11-10T10:48:17Z
Added create datamap parser and saved to schema file
----
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by xubo245 <gi...@git.apache.org>.
Github user xubo245 commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r164042794
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala ---
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CarbonCreateDataMapCommand(
+ dataMapName: String,
+ tableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: Option[String])
+ extends RunnableCommand with SchemaProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
+ dmClassName.equalsIgnoreCase("preaggregate")) {
+ CreatePreAggregateTableCommand(dataMapName,
+ tableIdentifier,
+ dmClassName,
+ dmproperties,
+ queryString.get).run(sparkSession)
+ } else {
--- End diff --
Why do this? Do carbon support create datamap ... using 'abc.class' now?
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150491412
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
@@ -121,6 +126,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
AlterTableCompactionCommand(altertablemodel)
}
+ protected lazy val createDataMap: Parser[LogicalPlan] =
--- End diff --
ok
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487119
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
@@ -259,7 +260,7 @@ object PreAggregateUtil {
precision = precision,
scale = scale,
rawSchema = rawSchema), dataMapField)
- } else {
+} else {
--- End diff --
wrong indentation
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1607/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1617/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1053/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490665
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
+ if (loadAvailable) {
+ sparkSession
+ .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
}
+ } catch {
+ case e: Exception =>
+ sparkSession.
+ sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""")
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1057/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487207
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala ---
@@ -121,6 +126,15 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
AlterTableCompactionCommand(altertablemodel)
}
+ protected lazy val createDataMap: Parser[LogicalPlan] =
--- End diff --
Can you add comment to describe the syntax
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487031
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
+ if (loadAvailable) {
+ sparkSession
+ .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
}
+ } catch {
+ case e: Exception =>
+ sparkSession.
+ sql(s"""DROP TABLE IF EXISTS ${ tableModel.databaseName }.${ tableModel.tableName }""")
--- End diff --
move to previous line, move parameter to next line
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483293
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---
@@ -0,0 +1,78 @@
+package org.apache.carbondata.spark.testsuite.datamap
--- End diff --
ok
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483548
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CreateDataMapCommand(
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on the issue:
https://github.com/apache/carbondata/pull/1481
LGTM
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490403
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
+ if (loadAvailable) {
+ sparkSession
+ .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1029/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1672/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150480875
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateDrop.scala ---
@@ -31,18 +31,18 @@ class TestPreAggregateDrop extends QueryTest with BeforeAndAfterAll {
test("create and drop preaggregate table") {
sql(
- "create table preagg1 stored BY 'carbondata' tblproperties('parent'='maintable') as select" +
+ "create datamap preagg1 on table maintable using 'preaggregate' as select" +
--- End diff --
Is there any testcase has datamap property?
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1064/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1032/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1061/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490273
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Success with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1023/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1007/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150480340
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
@@ -30,6 +30,8 @@
private static final long serialVersionUID = 6577149126264181553L;
+ private String dataMapName;
--- End diff --
Is it unique within one table or unique across all tables?
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/carbondata/pull/1481
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/990/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481702
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CreateDataMapCommand(
--- End diff --
Please add `Carbon` prefix, it is better to add so that it is easier to find all carbon command
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481038
--- Diff: integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---
@@ -0,0 +1,78 @@
+package org.apache.carbondata.spark.testsuite.datamap
--- End diff --
Seems missing license file header
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150486928
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
--- End diff --
move to previous line, move parameter to next line
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483161
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---
@@ -322,17 +322,22 @@
for (DataMapSchema wrapperChildSchema : wrapperChildSchemaList) {
org.apache.carbondata.format.DataMapSchema thriftChildSchema =
new org.apache.carbondata.format.DataMapSchema();
- org.apache.carbondata.format.RelationIdentifier relationIdentifier =
- new org.apache.carbondata.format.RelationIdentifier();
- relationIdentifier
- .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName());
- relationIdentifier.setTableName(wrapperChildSchema.getRelationIdentifier().getTableName());
- relationIdentifier.setTableId(wrapperChildSchema.getRelationIdentifier().getTableId());
- thriftChildSchema.setRelationIdentifire(relationIdentifier);
+ if (wrapperChildSchema.getRelationIdentifier() != null) {
+ org.apache.carbondata.format.RelationIdentifier relationIdentifier =
+ new org.apache.carbondata.format.RelationIdentifier();
+ relationIdentifier
+ .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName());
--- End diff --
ok
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150490702
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala ---
@@ -259,7 +260,7 @@ object PreAggregateUtil {
precision = precision,
scale = scale,
rawSchema = rawSchema), dataMapField)
- } else {
+} else {
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1581/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/966/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150479725
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
@@ -30,6 +30,8 @@
private static final long serialVersionUID = 6577149126264181553L;
+ private String dataMapName;
--- End diff --
please add comment, this name is used for what purpose?
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1650/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481994
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java ---
@@ -30,6 +30,8 @@
private static final long serialVersionUID = 6577149126264181553L;
+ private String dataMapName;
--- End diff --
It is unique only to that table, not across tables
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1673/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1025/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150479458
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java ---
@@ -322,17 +322,22 @@
for (DataMapSchema wrapperChildSchema : wrapperChildSchemaList) {
org.apache.carbondata.format.DataMapSchema thriftChildSchema =
new org.apache.carbondata.format.DataMapSchema();
- org.apache.carbondata.format.RelationIdentifier relationIdentifier =
- new org.apache.carbondata.format.RelationIdentifier();
- relationIdentifier
- .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName());
- relationIdentifier.setTableName(wrapperChildSchema.getRelationIdentifier().getTableName());
- relationIdentifier.setTableId(wrapperChildSchema.getRelationIdentifier().getTableId());
- thriftChildSchema.setRelationIdentifire(relationIdentifier);
+ if (wrapperChildSchema.getRelationIdentifier() != null) {
+ org.apache.carbondata.format.RelationIdentifier relationIdentifier =
+ new org.apache.carbondata.format.RelationIdentifier();
+ relationIdentifier
+ .setDatabaseName(wrapperChildSchema.getRelationIdentifier().getDatabaseName());
--- End diff --
move to previous line, move parameter to next line
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
retest this please
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1052/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1643/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150487023
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala ---
@@ -34,105 +32,74 @@ import org.apache.carbondata.core.util.CarbonUtil
* 1. failed to create pre aggregate table.
* 2. failed to update main table
*
- * @param cm
- * @param dataFrame
- * @param createDSTable
* @param queryString
*/
case class CreatePreAggregateTableCommand(
- cm: TableModel,
- dataFrame: DataFrame,
- createDSTable: Boolean = true,
- queryString: String,
- fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, DataMapField])
+ dataMapName: String,
+ parentTableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: String)
extends RunnableCommand with SchemaProcessCommand {
override def run(sparkSession: SparkSession): Seq[Row] = {
processSchema(sparkSession)
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val storePath = CarbonEnv.getInstance(sparkSession).storePath
- CarbonEnv.getInstance(sparkSession).carbonMetastore.
- checkSchemasModifiedTimeAndReloadTables(storePath)
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
- cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
- val tbName = cm.tableName
- val dbName = cm.databaseName
- LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+ val df = sparkSession.sql(queryString)
+ val fieldRelationMap = PreAggregateUtil
+ .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val fields = fieldRelationMap.keySet.toSeq
+ val tableProperties = mutable.Map[String, String]()
+ dmproperties.foreach(t => tableProperties.put(t._1, t._2))
+ val tableIdentifier = TableIdentifier(dataMapName, parentTableIdentifier.database)
+ // prepare table model of the collected tokens
+ val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(false,
+ new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+ tableIdentifier.table.toLowerCase,
+ fields,
+ Seq(),
+ tableProperties,
+ None,
+ false,
+ None)
+
// getting the parent table
- val parentTable = PreAggregateUtil.getParentCarbonTable(dataFrame.logicalPlan)
+ val parentTable = PreAggregateUtil.getParentCarbonTable(df.logicalPlan)
// getting the table name
val parentTableName = parentTable.getFactTableName
// getting the db name of parent table
val parentDbName = parentTable.getDatabaseName
+
+ assert(parentTableName.equalsIgnoreCase(parentTableIdentifier.table))
// updating the relation identifier, this will be stored in child table
// which can be used during dropping of pre-aggreate table as parent table will
// also get updated
- cm.parentTable = Some(parentTable)
- cm.dataMapRelation = Some(fieldRelationMap)
- val tableInfo: TableInfo = TableNewProcessor(cm)
- // Add validation for sort scope when create table
- val sortScope = tableInfo.getFactTable.getTableProperties
- .getOrDefault("sort_scope", CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
- if (!CarbonUtil.isValidSortOption(sortScope)) {
- throw new InvalidConfigurationException(
- s"Passing invalid SORT_SCOPE '$sortScope', valid SORT_SCOPE are 'NO_SORT', 'BATCH_SORT'," +
- s" 'LOCAL_SORT' and 'GLOBAL_SORT' ")
- }
-
- if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
- sys.error("No Dimensions found. Table should have at least one dimesnion !")
- }
-
- if (sparkSession.sessionState.catalog.listTables(dbName)
- .exists(_.table.equalsIgnoreCase(tbName))) {
- if (!cm.ifNotExistsSet) {
- LOGGER.audit(
- s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
- s"Table [$tbName] already exists under database [$dbName]")
- sys.error(s"Table [$tbName] already exists under database [$dbName]")
- }
- } else {
- val tableIdentifier = AbsoluteTableIdentifier.from(storePath, dbName, tbName)
- // Add Database to catalog and persist
- val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
- val tablePath = tableIdentifier.getTablePath
- val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tablePath)
- if (createDSTable) {
- try {
- val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
- cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
- cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
- sparkSession.sql(
- s"""CREATE TABLE $dbName.$tbName
- |(${ fields.map(f => f.rawSchema).mkString(",") })
- |USING org.apache.spark.sql.CarbonSource""".stripMargin +
- s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
- s""""$tablePath"$carbonSchemaString) """)
- // child schema object which will be updated on parent table about the
- val childSchema = tableInfo.getFactTable
- .buildChildSchema("", tableInfo.getDatabaseName, queryString, "AGGREGATION")
- // upadting the parent table about child table
- PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
- val loadAvailable = PreAggregateUtil
- .checkMainTableLoad(parentTable)
- if (loadAvailable) {
- sparkSession.sql(s"insert into ${ cm.databaseName }.${ cm.tableName } $queryString")
- }
- } catch {
- case e: Exception =>
- val identifier: TableIdentifier = TableIdentifier(tbName, Some(dbName))
- // call the drop table to delete the created table.
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .dropTable(tablePath, identifier)(sparkSession)
- LOGGER.audit(s"Table creation with Database name [$dbName] " +
- s"and Table name [$tbName] failed")
- throw e
- }
+ tableModel.parentTable = Some(parentTable)
+ tableModel.dataMapRelation = Some(fieldRelationMap)
+ CarbonCreateTableCommand(tableModel).run(sparkSession)
+ try {
+ val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore.
+ lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
+ val tableInfo = relation.tableMeta.carbonTable.getTableInfo
+ // child schema object which will be updated on parent table about the
+ val childSchema = tableInfo.getFactTable
+ .buildChildSchema(dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
+ // updating the parent table about child table
+ PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
+ val loadAvailable = PreAggregateUtil.checkMainTableLoad(parentTable)
+ if (loadAvailable) {
+ sparkSession
+ .sql(s"insert into ${ tableModel.databaseName }.${ tableModel.tableName } $queryString")
--- End diff --
move to previous line, move parameter to next line
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1641/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150480075
--- Diff: core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java ---
@@ -255,21 +255,15 @@ public void readFields(DataInput in) throws IOException {
* Below method will be used to build child schema object which will be stored in
* parent table
*
- * @param className
- * @param databaseName
- * @param queryString
- * @param queryType
- *
- * @return datamap schema
*/
- public DataMapSchema buildChildSchema(String className, String databaseName, String queryString,
- String queryType) {
+ public DataMapSchema buildChildSchema(String dataMapName, String className, String databaseName,
--- End diff --
Is the dataMapName is for child schema? Please modify the parameter name or describe in function comment in line 255
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150483631
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CreateDataMapCommand(
+ dataMapName: String,
+ tableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: Option[String])
+ extends RunnableCommand with SchemaProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
+ dmClassName.equalsIgnoreCase("preaggregate")) {
+ CreatePreAggregateTableCommand(dataMapName,
+ tableIdentifier,
+ dmClassName,
+ dmproperties,
+ queryString.get).run(sparkSession)
+ } else {
+
--- End diff --
ok
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Fail , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1624/
---
[GitHub] carbondata pull request #1481: [CARBONDATA-1576] Added create datamap parser...
Posted by jackylk <gi...@git.apache.org>.
Github user jackylk commented on a diff in the pull request:
https://github.com/apache/carbondata/pull/1481#discussion_r150481737
--- Diff: integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CreateDataMapCommand.scala ---
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.command.datamap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.execution.command.preaaggregate.{CreatePreAggregateTableCommand, PreAggregateUtil}
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+
+/**
+ * Below command class will be used to create datamap on table
+ * and updating the parent table about the datamap information
+ *
+ * @param queryString
+ */
+case class CreateDataMapCommand(
+ dataMapName: String,
+ tableIdentifier: TableIdentifier,
+ dmClassName: String,
+ dmproperties: Map[String, String],
+ queryString: Option[String])
+ extends RunnableCommand with SchemaProcessCommand {
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ processSchema(sparkSession)
+ }
+
+ override def processSchema(sparkSession: SparkSession): Seq[Row] = {
+ val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ if (dmClassName.equals("org.apache.carbondata.datamap.AggregateDataMapHandler") ||
+ dmClassName.equalsIgnoreCase("preaggregate")) {
+ CreatePreAggregateTableCommand(dataMapName,
+ tableIdentifier,
+ dmClassName,
+ dmproperties,
+ queryString.get).run(sparkSession)
+ } else {
+
--- End diff --
remove empty line
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by ravipesala <gi...@git.apache.org>.
Github user ravipesala commented on the issue:
https://github.com/apache/carbondata/pull/1481
SDV Build Success , Please check CI http://144.76.159.231:8080/job/ApacheSDVTests/1647/
---
[GitHub] carbondata issue #1481: [CARBONDATA-1576] Added create datamap parser and sa...
Posted by CarbonDataQA <gi...@git.apache.org>.
Github user CarbonDataQA commented on the issue:
https://github.com/apache/carbondata/pull/1481
Build Failed with Spark 2.1.0, Please check CI http://136.243.101.176:8080/job/ApacheCarbonPRBuilder1/1000/
---