You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/13 19:16:41 UTC
[1/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table
selection and Query Plan changes
Repository: carbondata
Updated Branches:
refs/heads/pre-aggregate c1eefee7c -> dda2573a1
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
new file mode 100644
index 0000000..3fb0db0
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -0,0 +1,829 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonDatasourceHadoopRelation, InsertIntoCarbonTable, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, NamedExpression, PredicateSubquery, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable, DataMapSchema}
+import org.apache.carbondata.core.preagg.{AggregateTableSelector, QueryColumn, QueryPlan}
+import org.apache.carbondata.spark.util.CarbonScalaUtil
+
+/**
+ * Class for applying Pre Aggregate rules
+ * Responsibility.
+ * 1. Check plan is valid plan for updating the parent table plan with child table
+ * 2. Updated the plan based on child schema
+ *
+ * Rules for Upadating the plan
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation
+ * sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ *
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ *
+ * @param sparkSession
+ * spark session
+ */
+case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+
+ override def apply(plan: LogicalPlan): LogicalPlan = {
+ var needAnalysis = true
+ plan.transformExpressions {
+ // first check if any preAgg scala function is applied it is present is in plan
+ // then call is from create preaggregate table class so no need to transform the query plan
+ case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
+ needAnalysis = false
+ al
+ // in case of query if any unresolve alias is present then wait for plan to be resolved
+ // return the same plan as we can tranform the plan only when everything is resolved
+ case unresolveAlias@UnresolvedAlias(_, _) =>
+ needAnalysis = false
+ unresolveAlias
+ case attr@UnresolvedAttribute(_) =>
+ needAnalysis = false
+ attr
+ }
+ // if plan is not valid for transformation then return same plan
+ if (!needAnalysis) {
+ plan
+ } else {
+ // create buffer to collect all the column and its metadata information
+ val list = scala.collection.mutable.ListBuffer.empty[QueryColumn]
+ var isValidPlan = true
+ val carbonTable = plan match {
+ // matching the plan based on supported plan
+ // if plan is matches with any case it will validate and get all
+ // information required for transforming the plan
+
+ // When plan has grouping expression, aggregate expression
+ // subquery
+ case Aggregate(groupingExp,
+ aggregateExp,
+ SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ .hasDataMapSchema =>
+ val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+ // if it is valid plan then extract the query columns
+ isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+ aggregateExp,
+ carbonTable,
+ tableName,
+ list)
+ carbonTable
+
+ // below case for handling filter query
+ // When plan has grouping expression, aggregate expression
+ // filter expression
+ case Aggregate(groupingExp, aggregateExp,
+ Filter(filterExp,
+ SubqueryAlias(_, logicalRelation: LogicalRelation, _)))
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ .hasDataMapSchema =>
+ val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+ // if it is valid plan then extract the query columns
+ isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+ aggregateExp,
+ carbonTable,
+ tableName,
+ list)
+ // TODO need to handle filter predicate subquery scenario
+ isValidPlan = !PredicateSubquery.hasPredicateSubquery(filterExp)
+ // getting the columns from filter expression
+ if(isValidPlan) {
+ filterExp.transform {
+ case attr: AttributeReference =>
+ list += getQueryColumn(attr.name, carbonTable, tableName, isFilterColumn = true)
+ attr
+ }
+ }
+ carbonTable
+
+ // When plan has grouping expression, aggregate expression
+ // logical relation
+ case Aggregate(groupingExp, aggregateExp, logicalRelation: LogicalRelation)
+ // only carbon query plan is supported checking whether logical relation is
+ // is for carbon
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable
+ .hasDataMapSchema =>
+ val (carbonTable, tableName) = getCarbonTableAndTableName(logicalRelation)
+ // if it is valid plan then extract the query columns
+ isValidPlan = extractQueryColumnsFromAggExpression(groupingExp,
+ aggregateExp,
+ carbonTable,
+ tableName,
+ list)
+ carbonTable
+ case _ =>
+ isValidPlan = false
+ null
+ }
+ // if plan is valid then update the plan with child attributes
+ if (isValidPlan) {
+ // getting all the projection columns
+ val listProjectionColumn = list
+ .filter(queryColumn => queryColumn.getAggFunction.isEmpty && !queryColumn.isFilterColumn)
+ // getting all the filter columns
+ val listFilterColumn = list
+ .filter(queryColumn => queryColumn.getAggFunction.isEmpty && queryColumn.isFilterColumn)
+ // getting all the aggregation columns
+ val listAggregationColumn = list.filter(queryColumn => !queryColumn.getAggFunction.isEmpty)
+ // create a query plan object which will be used to select the list of pre aggregate tables
+ // matches with this plan
+ val queryPlan = new QueryPlan(listProjectionColumn.asJava,
+ listAggregationColumn.asJava,
+ listFilterColumn.asJava)
+ // create aggregate table selector object
+ val aggregateTableSelector = new AggregateTableSelector(queryPlan, carbonTable)
+ // select the list of valid child tables
+ val selectedDataMapSchemas = aggregateTableSelector.selectPreAggDataMapSchema()
+ // if it doesnot match with any pre aggregate table return the same plan
+ if (!selectedDataMapSchemas.isEmpty) {
+ // sort the selected child schema based on size to select smallest pre aggregate table
+ val (aggDataMapSchema, carbonRelation) =
+ selectedDataMapSchemas.asScala.map { selectedDataMapSchema =>
+ val catalog = sparkSession.sessionState.catalog
+ val carbonRelation = catalog
+ .lookupRelation(TableIdentifier(selectedDataMapSchema.getRelationIdentifier
+ .getTableName,
+ Some(selectedDataMapSchema.getRelationIdentifier
+ .getDatabaseName))).asInstanceOf[SubqueryAlias].child
+ .asInstanceOf[LogicalRelation]
+ (selectedDataMapSchema, carbonRelation)
+ }.minBy(f => f._2.relation.asInstanceOf[CarbonDatasourceHadoopRelation].sizeInBytes)
+ // transform the query plan based on selected child schema
+ transformPreAggQueryPlan(plan, aggDataMapSchema, carbonRelation)
+ } else {
+ plan
+ }
+ } else {
+ plan
+ }
+ }
+ }
+
+ /**
+ * Below method will be used to get the child attribute reference
+ * based on parent name
+ *
+ * @param dataMapSchema
+ * child schema
+ * @param attributeReference
+ * parent attribute reference
+ * @param childCarbonRelation
+ * child logical relation
+ * @param aggFunction
+ * aggregation function applied on child
+ * @return child attribute reference
+ */
+ def getChildAttributeReference(dataMapSchema: DataMapSchema,
+ attributeReference: AttributeReference,
+ childCarbonRelation: LogicalRelation,
+ aggFunction: String = ""): AttributeReference = {
+ val aggregationDataMapSchema = dataMapSchema.asInstanceOf[AggregationDataMapSchema];
+ val columnSchema = if (aggFunction.isEmpty) {
+ aggregationDataMapSchema.getChildColByParentColName(attributeReference.name)
+ } else {
+ aggregationDataMapSchema.getAggChildColByParent(attributeReference.name, aggFunction)
+ }
+ // here column schema cannot be null, if it is null then aggregate table selection
+ // logic has some problem
+ if (null == columnSchema) {
+ throw new AnalysisException("Column doesnot exists in Pre Aggregate table")
+ }
+ // finding the child attribute from child logical relation
+ childCarbonRelation.attributeMap.find(p => p._2.name.equals(columnSchema.getColumnName)).get._2
+ }
+
+ /**
+ * Below method will be used to transform the main table plan to child table plan
+ * rules for transformming is as below.
+ * 1. Grouping expression rules
+ * 1.1 Change the parent attribute reference for of group expression
+ * to child attribute reference
+ *
+ * 2. Aggregate expression rules
+ * 2.1 Change the parent attribute reference for of group expression to
+ * child attribute reference
+ * 2.2 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2.2 In case of average aggregate function select 2 columns from aggregate table with
+ * aggregation sum and count. Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average table will be created with two columns
+ * one for sum(column) and count(column) to support rollup
+ * 3. Filter Expression rules.
+ * 3.1 Updated filter expression attributes with child table attributes
+ * 4. Update the Parent Logical relation with child Logical relation
+ *
+ * @param logicalPlan
+ * parent logical plan
+ * @param aggDataMapSchema
+ * select data map schema
+ * @param childCarbonRelation
+ * child carbon table relation
+ * @return transformed plan
+ */
+ def transformPreAggQueryPlan(logicalPlan: LogicalPlan,
+ aggDataMapSchema: DataMapSchema, childCarbonRelation: LogicalRelation): LogicalPlan = {
+ logicalPlan.transform {
+ case Aggregate(grExp, aggExp, child@SubqueryAlias(_, l: LogicalRelation, _))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ None,
+ aggDataMapSchema,
+ childCarbonRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ newChild)
+ case Aggregate(grExp,
+ aggExp,
+ Filter(expression, child@SubqueryAlias(_, l: LogicalRelation, _)))
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ val (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ child,
+ Some(expression),
+ aggDataMapSchema,
+ childCarbonRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ Filter(updatedFilterExpression.get,
+ newChild))
+ case Aggregate(grExp, aggExp, l: LogicalRelation)
+ if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] &&
+ l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonTable.hasDataMapSchema =>
+ val (updatedGroupExp, updatedAggExp, newChild, None) =
+ getUpdatedExpressions(grExp,
+ aggExp,
+ l,
+ None,
+ aggDataMapSchema,
+ childCarbonRelation)
+ Aggregate(updatedGroupExp,
+ updatedAggExp,
+ newChild)
+ }
+ }
+
+ /**
+ * Below method will be used to get the updated expression for pre aggregated table.
+ * It will replace the attribute of actual plan with child table attributes.
+ * Updation will be done for below expression.
+ * 1. Grouping expression
+ * 2. aggregate expression
+ * 3. child logical plan
+ * 4. filter expression if present
+ *
+ * @param groupingExpressions
+ * actual plan grouping expression
+ * @param aggregateExpressions
+ * actual plan aggregate expression
+ * @param child
+ * child logical plan
+ * @param filterExpression
+ * filter expression
+ * @param aggDataMapSchema
+ * pre aggregate table schema
+ * @param childCarbonRelation
+ * pre aggregate table logical relation
+ * @return tuple of(updated grouping expression,
+ * updated aggregate expression,
+ * updated child logical plan,
+ * updated filter expression if present in actual plan)
+ */
+ def getUpdatedExpressions(groupingExpressions: Seq[Expression],
+ aggregateExpressions: Seq[NamedExpression],
+ child: LogicalPlan, filterExpression: Option[Expression] = None,
+ aggDataMapSchema: DataMapSchema,
+ childCarbonRelation: LogicalRelation): (Seq[Expression], Seq[NamedExpression], LogicalPlan,
+ Option[Expression]) = {
+ // transforming the group by expression attributes with child attributes
+ val updatedGroupExp = groupingExpressions.map { exp =>
+ exp.transform {
+ case attr: AttributeReference =>
+ getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+ }
+ }
+ // below code is for updating the aggregate expression.
+ // Note: In case of aggregate expression updation we need to return alias as
+ // while showing the final result we need to show based on actual query
+ // for example: If query is "select name from table group by name"
+ // if we only update the attributes it will show child table column name in final output
+ // so for handling this if attributes does not have alias we need to return alias of
+ // parent
+ // table column name
+ // Rules for updating aggregate expression.
+ // 1. If it matches with attribute reference return alias of child attribute reference
+ // 2. If it matches with alias return same alias with child attribute reference
+ // 3. If it matches with alias of any supported aggregate function return aggregate function
+ // with child attribute reference. Please check class level documentation how when aggregate
+ // function will be updated
+
+ val updatedAggExp = aggregateExpressions.map {
+ // case for attribute reference
+ case attr: AttributeReference =>
+ val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+ attr,
+ childCarbonRelation)
+ // returning the alias to show proper column name in output
+ Alias(childAttributeReference,
+ attr.name)(NamedExpression.newExprId,
+ childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+ // case for alias
+ case Alias(attr: AttributeReference, name) =>
+ val childAttributeReference = getChildAttributeReference(aggDataMapSchema,
+ attr,
+ childCarbonRelation)
+ // returning alias with child attribute reference
+ Alias(childAttributeReference,
+ name)(NamedExpression.newExprId,
+ childAttributeReference.qualifier).asInstanceOf[NamedExpression]
+ // for aggregate function case
+ case alias@Alias(attr: AggregateExpression, name) =>
+ // get the updated aggregate aggregate function
+ val aggExp = getUpdatedAggregateExpressionForChild(attr,
+ aggDataMapSchema,
+ childCarbonRelation)
+ // returning alias with child attribute reference
+ Alias(aggExp,
+ name)(NamedExpression.newExprId,
+ alias.qualifier).asInstanceOf[NamedExpression]
+ }
+ // transformaing the logical relation
+ val newChild = child.transform {
+ case _: LogicalRelation =>
+ childCarbonRelation
+ case _: SubqueryAlias =>
+ childCarbonRelation
+ }
+ // updating the filter expression if present
+ val updatedFilterExpression = if (filterExpression.isDefined) {
+ val filterExp = filterExpression.get
+ Some(filterExp.transform {
+ case attr: AttributeReference =>
+ getChildAttributeReference(aggDataMapSchema, attr, childCarbonRelation)
+ })
+ } else {
+ None
+ }
+ (updatedGroupExp, updatedAggExp, newChild, updatedFilterExpression)
+ }
+
+ /**
+ * Below method will be used to get the aggregate expression based on match
+ * Aggregate expression updation rules
+ * 1 Change the count AggregateExpression to Sum as count
+ * is already calculated so in case of aggregate table
+ * we need to apply sum to get the count
+ * 2 In case of average aggregate function select 2 columns from aggregate table
+ * with aggregation sum and count.
+ * Then add divide(sum(column with sum), sum(column with count)).
+ * Note: During aggregate table creation for average aggregation function
+ * table will be created with two columns one for sum(column) and count(column)
+ * to support rollup
+ *
+ * @param aggExp
+ * aggregate expression
+ * @param dataMapSchema
+ * child data map schema
+ * @param childCarbonRelation
+ * child logical relation
+ * @return updated expression
+ */
+ def getUpdatedAggregateExpressionForChild(aggExp: AggregateExpression,
+ dataMapSchema: DataMapSchema,
+ childCarbonRelation: LogicalRelation):
+ Expression = {
+ aggExp.aggregateFunction match {
+ // Change the count AggregateExpression to Sum as count
+ // is already calculated so in case of aggregate table
+ // we need to apply sum to get the count
+ case count@Count(Seq(attr: AttributeReference)) =>
+ AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ count.prettyName),
+ LongType)),
+ aggExp.mode,
+ isDistinct = false)
+ case sum@Sum(attr: AttributeReference) =>
+ AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ sum.prettyName)),
+ aggExp.mode,
+ isDistinct = false)
+ case max@Max(attr: AttributeReference) =>
+ AggregateExpression(Max(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ max.prettyName)),
+ aggExp.mode,
+ isDistinct = false)
+ case min@Min(attr: AttributeReference) =>
+ AggregateExpression(Min(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ min.prettyName)),
+ aggExp.mode,
+ isDistinct = false)
+ case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ sum.prettyName),
+ changeDataType)),
+ aggExp.mode,
+ isDistinct = false)
+ case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ AggregateExpression(Min(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ min.prettyName),
+ changeDataType)),
+ aggExp.mode,
+ isDistinct = false)
+ case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ AggregateExpression(Max(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ max.prettyName),
+ changeDataType)),
+ aggExp.mode,
+ isDistinct = false)
+
+ // In case of average aggregate function select 2 columns from aggregate table
+ // with aggregation sum and count.
+ // Then add divide(sum(column with sum), sum(column with count)).
+ case Average(attr: AttributeReference) =>
+ Divide(AggregateExpression(Sum(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ "sum")),
+ aggExp.mode,
+ isDistinct = false),
+ AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ "count"),
+ LongType)),
+ aggExp.mode,
+ isDistinct = false))
+ // In case of average aggregate function select 2 columns from aggregate table
+ // with aggregation sum and count.
+ // Then add divide(sum(column with sum), sum(column with count)).
+ case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ Divide(AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ "sum"),
+ changeDataType)),
+ aggExp.mode,
+ isDistinct = false),
+ AggregateExpression(Sum(Cast(getChildAttributeReference(dataMapSchema,
+ attr,
+ childCarbonRelation,
+ "count"),
+ LongType)),
+ aggExp.mode,
+ isDistinct = false))
+ }
+ }
+
+ /**
+ * Method to get the carbon table and table name
+ *
+ * @param parentLogicalRelation
+ * parent table relation
+ * @return tuple of carbon table and table name
+ */
+ def getCarbonTableAndTableName(parentLogicalRelation: LogicalRelation): (CarbonTable, String) = {
+ val carbonTable = parentLogicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ .carbonRelation
+ .metaData.carbonTable
+ val tableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableName
+ (carbonTable, tableName)
+ }
+
+ /**
+ * Below method will be used to get the query columns from plan
+ *
+ * @param groupByExpression
+ * group by expression
+ * @param aggregateExpressions
+ * aggregate expression
+ * @param carbonTable
+ * parent carbon table
+ * @param tableName
+ * parent table name
+ * @param list
+ * list of attributes
+ * @return plan is valid
+ */
+ def extractQueryColumnsFromAggExpression(groupByExpression: Seq[Expression],
+ aggregateExpressions: Seq[NamedExpression],
+ carbonTable: CarbonTable, tableName: String,
+ list: scala.collection.mutable.ListBuffer[QueryColumn]): Boolean = {
+ aggregateExpressions.map {
+ case attr: AttributeReference =>
+ list += getQueryColumn(attr.name,
+ carbonTable,
+ tableName);
+ case Alias(attr: AttributeReference, _) =>
+ list += getQueryColumn(attr.name,
+ carbonTable,
+ tableName);
+ case Alias(attr: AggregateExpression, _) =>
+ if (attr.isDistinct) {
+ return false
+ }
+ val queryColumn = validateAggregateFunctionAndGetFields(carbonTable,
+ attr.aggregateFunction,
+ tableName)
+ if (queryColumn.nonEmpty) {
+ list ++= queryColumn
+ } else {
+ return false
+ }
+ }
+ true
+ }
+
+ /**
+ * Below method will be used to validate aggregate function and get the attribute information
+ * which is applied on select query.
+ * Currently sum, max, min, count, avg is supported
+ * in case of any other aggregate function it will return empty sequence
+ * In case of avg it will return two fields one for count
+ * and other of sum of that column to support rollup
+ *
+ * @param carbonTable
+ * parent table
+ * @param aggFunctions
+ * aggregation function
+ * @param tableName
+ * parent table name
+ * @return list of fields
+ */
+ def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
+ aggFunctions: AggregateFunction,
+ tableName: String
+ ): Seq[QueryColumn] = {
+ val changedDataType = true
+ aggFunctions match {
+ case sum@Sum(attr: AttributeReference) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ sum.prettyName))
+ case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ sum.prettyName,
+ changeDataType.typeName,
+ changedDataType))
+ case count@Count(Seq(attr: AttributeReference)) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ count.prettyName))
+ case min@Min(attr: AttributeReference) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ min.prettyName))
+ case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ min.prettyName,
+ changeDataType.typeName,
+ changedDataType))
+ case max@Max(attr: AttributeReference) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ max.prettyName))
+ case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ max.prettyName,
+ changeDataType.typeName,
+ changedDataType))
+ // in case of average need to return two columns
+ // sum and count of the column to added during table creation to support rollup
+ case Average(attr: AttributeReference) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ "sum"
+ ), getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ "count"
+ ))
+ // in case of average need to return two columns
+ // sum and count of the column to added during table creation to support rollup
+ case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
+ Seq(getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ "sum",
+ changeDataType.typeName,
+ changedDataType), getQueryColumn(attr.name,
+ carbonTable,
+ tableName,
+ "count",
+ changeDataType.typeName,
+ changedDataType))
+ case _ =>
+ Seq.empty
+ }
+ }
+
+ /**
+ * Below method will be used to get the query column object which
+ * will have details of the column and its property
+ *
+ * @param columnName
+ * parent column name
+ * @param carbonTable
+ * parent carbon table
+ * @param tableName
+ * parent table name
+ * @param aggFunction
+ * aggregate function applied
+ * @param dataType
+ * data type of the column
+ * @param isChangedDataType
+ * is cast is applied on column
+ * @param isFilterColumn
+ * is filter is applied on column
+ * @return query column
+ */
+ def getQueryColumn(columnName: String,
+ carbonTable: CarbonTable,
+ tableName: String,
+ aggFunction: String = "",
+ dataType: String = "",
+ isChangedDataType: Boolean = false,
+ isFilterColumn: Boolean = false): QueryColumn = {
+ val columnSchema = carbonTable.getColumnByName(tableName, columnName).getColumnSchema
+ if (isChangedDataType) {
+ new QueryColumn(columnSchema, columnSchema.getDataType.getName, aggFunction, isFilterColumn)
+ } else {
+ new QueryColumn(columnSchema,
+ CarbonScalaUtil.convertSparkToCarbonSchemaDataType(dataType),
+ aggFunction, isFilterColumn)
+ }
+ }
+}
+
+/**
+ * Insert into carbon table from other source
+ */
+object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan.transform {
+ // Wait until children are resolved.
+ case p: LogicalPlan if !p.childrenResolved => p
+
+ case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
+ if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
+ }
+ }
+
+ def castChildOutput(p: InsertIntoTable,
+ relation: CarbonDatasourceHadoopRelation,
+ child: LogicalPlan)
+ : LogicalPlan = {
+ if (relation.carbonRelation.output.size > CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
+ sys
+ .error("Maximum supported column by carbon is:" + CarbonCommonConstants
+ .DEFAULT_MAX_NUMBER_OF_COLUMNS
+ )
+ }
+ val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
+ .getParentRelationIdentifiers.isEmpty
+ // transform logical plan if the load is for aggregate table.
+ val childPlan = if (isAggregateTable) {
+ transformAggregatePlan(child)
+ } else {
+ child
+ }
+ if (childPlan.output.size >= relation.carbonRelation.output.size) {
+ val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
+ columnWithIndex._1 match {
+ case attr: Alias =>
+ Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
+ case attr: Attribute =>
+ Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
+ case attr => attr
+ }
+ }
+ val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
+ p.child
+ } else {
+ Project(newChildOutput, childPlan)
+ }
+ InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
+ } else {
+ sys.error("Cannot insert into target table because column number are different")
+ }
+ }
+
+ /**
+ * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
+ *
+ * @param logicalPlan
+ * @return
+ */
+ private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ logicalPlan transform {
+ case aggregate@Aggregate(_, aExp, _) =>
+ val newExpressions = aExp.flatMap {
+ case alias@Alias(attrExpression: AggregateExpression, _) =>
+ attrExpression.aggregateFunction match {
+ case Average(attr: AttributeReference) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(attr),
+ resultId = NamedExpression.newExprId), attr.name + "_sum")(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(attr),
+ resultId = NamedExpression.newExprId), attr.name + "_count")())
+ case Average(cast@Cast(attr: AttributeReference, _)) =>
+ Seq(Alias(attrExpression
+ .copy(aggregateFunction = Sum(cast),
+ resultId = NamedExpression.newExprId),
+ attr.name + "_sum")(),
+ Alias(attrExpression
+ .copy(aggregateFunction = Count(cast),
+ resultId = NamedExpression.newExprId), attr.name + "_count")())
+ case _ => Seq(alias)
+ }
+ case namedExpr: NamedExpression => Seq(namedExpr)
+ }
+ aggregate.copy(aggregateExpressions = newExpressions.asInstanceOf[Seq[NamedExpression]])
+ case plan: LogicalPlan => plan
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
index f698dd4..205b716 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -169,15 +169,15 @@ class CarbonSessionState(sparkSession: SparkSession) extends HiveSessionState(sp
catalog.ParquetConversions ::
catalog.OrcConversions ::
CarbonPreInsertionCasts ::
+ CarbonPreAggregateQueryRules(sparkSession) ::
CarbonIUDAnalysisRule(sparkSession) ::
AnalyzeCreateTable(sparkSession) ::
PreprocessTableInsertion(conf) ::
DataSourceAnalysis(conf) ::
(if (conf.runSQLonFile) {
new ResolveDataSource(sparkSession) :: Nil
- } else {
- Nil
- })
+ } else { Nil }
+ )
override val extendedCheckRules = Seq(
PreWriteCheck(conf, catalog))
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 46a2515..3bed9d1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -476,6 +476,14 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
}
}
+ def addPreAggFunction(sql: String): String = {
+ addPreAgg(new lexical.Scanner(sql.toLowerCase)) match {
+ case Success(query, _) => query
+ case failureOrError => throw new MalformedCarbonCommandException(
+ s"Unsupported query")
+ }
+ }
+
def getBucketFields(
properties: mutable.Map[String, String],
fields: Seq[Field],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 5c51156..401b149 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.parser
import scala.collection.mutable
-import org.apache.spark.sql.{CarbonSession, DataFrame, Dataset, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -74,6 +74,7 @@ class CarbonSqlAstBuilder(conf: SQLConf) extends SparkSqlAstBuilder(conf) {
val parser = new CarbonSpark2SqlParser
override def visitCreateTable(ctx: CreateTableContext): LogicalPlan = {
+ Option(ctx.query()).map(plan)
val fileStorage = Option(ctx.createFileFormat) match {
case Some(value) =>
if (value.children.get(1).getText.equalsIgnoreCase("by")) {
[2/2] carbondata git commit: [CARBONDATA-1523]Pre Aggregate table
selection and Query Plan changes
Posted by ra...@apache.org.
[CARBONDATA-1523]Pre Aggregate table selection and Query Plan changes
This closes #1464
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/dda2573a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/dda2573a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/dda2573a
Branch: refs/heads/pre-aggregate
Commit: dda2573a1160ba0d97047693c4b094bc6da06d95
Parents: c1eefee
Author: kumarvishal <ku...@gmail.com>
Authored: Mon Oct 30 12:44:32 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Tue Nov 14 00:46:24 2017 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 2 +
.../ThriftWrapperSchemaConverterImpl.java | 7 +-
.../schema/table/AggregationDataMapSchema.java | 212 +++++
.../core/metadata/schema/table/CarbonTable.java | 16 +-
.../metadata/schema/table/DataMapSchema.java | 19 +-
.../schema/table/DataMapSchemaFactory.java | 38 +
.../core/metadata/schema/table/TableInfo.java | 7 +-
.../core/metadata/schema/table/TableSchema.java | 5 +-
.../core/preagg/AggregateTableSelector.java | 135 +++
.../carbondata/core/preagg/QueryColumn.java | 70 ++
.../carbondata/core/preagg/QueryPlan.java | 59 ++
.../TestPreAggregateTableSelection.scala | 175 ++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 7 +
.../spark/rdd/CarbonDataRDDFactory.scala | 2 +-
.../scala/org/apache/spark/sql/CarbonEnv.scala | 3 +
.../CreatePreAggregateTableCommand.scala | 12 +-
.../preaaggregate/PreAggregateListeners.scala | 24 +-
.../preaaggregate/PreAggregateUtil.scala | 184 ++--
.../spark/sql/hive/CarbonAnalysisRules.scala | 101 +--
.../sql/hive/CarbonPreAggregateRules.scala | 829 +++++++++++++++++++
.../spark/sql/hive/CarbonSessionState.scala | 6 +-
.../sql/parser/CarbonSpark2SqlParser.scala | 8 +
.../spark/sql/parser/CarbonSparkSqlParser.scala | 3 +-
23 files changed, 1709 insertions(+), 215 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index cb0b5c3..f17a9e7 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1411,6 +1411,8 @@ public final class CarbonCommonConstants {
public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
+ public static final String AGGREGATIONDATAMAPSCHEMA = "AggregateDataMapHandler";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index b914e06..fef2e0f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -31,6 +31,7 @@ import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
import org.apache.carbondata.core.metadata.schema.partition.PartitionType;
import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchemaFactory;
import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
import org.apache.carbondata.core.metadata.schema.table.TableInfo;
import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -628,10 +629,10 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
@Override public DataMapSchema fromExternalToWrapperDataMapSchema(
org.apache.carbondata.format.DataMapSchema thriftDataMapSchema) {
- DataMapSchema childSchema =
- new DataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
+ DataMapSchema childSchema = DataMapSchemaFactory.INSTANCE
+ .getDataMapSchema(thriftDataMapSchema.getDataMapName(), thriftDataMapSchema.getClassName());
childSchema.setProperties(thriftDataMapSchema.getProperties());
- if (thriftDataMapSchema.getRelationIdentifire() != null) {
+ if (null != thriftDataMapSchema.getRelationIdentifire()) {
RelationIdentifier relationIdentifier =
new RelationIdentifier(thriftDataMapSchema.getRelationIdentifire().getDatabaseName(),
thriftDataMapSchema.getRelationIdentifire().getTableName(),
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
new file mode 100644
index 0000000..87c07f4
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.metadata.schema.table;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation;
+
+/**
+ * data map schema class for pre aggregation
+ */
+public class AggregationDataMapSchema extends DataMapSchema {
+
+ /**
+ * map of parent column name to set of child column column without
+ * aggregation function
+ */
+ private Map<String, Set<ColumnSchema>> parentToNonAggChildMapping;
+
+ /**
+ * map of parent column name to set of child columns column with
+ * aggregation function
+ */
+ private Map<String, Set<ColumnSchema>> parentToAggChildMapping;
+
+ /**
+ * map of parent column name to set of aggregation function applied in
+ * in parent column
+ */
+ private Map<String, Set<String>> parentColumnToAggregationsMapping;
+
+ public AggregationDataMapSchema(String dataMapName, String className) {
+ super(dataMapName, className);
+ }
+
+ public void setChildSchema(TableSchema childSchema) {
+ super.setChildSchema(childSchema);
+ List<ColumnSchema> listOfColumns = getChildSchema().getListOfColumns();
+ fillNonAggFunctionColumns(listOfColumns);
+ fillAggFunctionColumns(listOfColumns);
+ fillParentNameToAggregationMapping(listOfColumns);
+ }
+
+ /**
+ * Below method will be used to get the columns on which aggregate function is not applied
+ * @param columnName
+ * parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getNonAggChildColBasedByParent(String columnName) {
+ Set<ColumnSchema> columnSchemas = parentToNonAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if (null == next.getAggFunction() || next.getAggFunction().isEmpty()) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method will be used to get the column schema based on parent column name
+ * @param columName
+ * parent column name
+ * @return child column schema
+ */
+ public ColumnSchema getChildColByParentColName(String columName) {
+ List<ColumnSchema> listOfColumns = childSchema.getListOfColumns();
+ for (ColumnSchema columnSchema : listOfColumns) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ columnSchema.getParentColumnTableRelations();
+ if (parentColumnTableRelations.get(0).getColumnName().equals(columName)) {
+ return columnSchema;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method will be used to get the child column schema based on parent name and aggregate
+ * function applied on column
+ * @param columnName
+ * parent column name
+ * @param aggFunction
+ * aggregate function applied
+ * @return child column schema
+ */
+ public ColumnSchema getAggChildColByParent(String columnName,
+ String aggFunction) {
+ Set<ColumnSchema> columnSchemas = parentToAggChildMapping.get(columnName);
+ if (null != columnSchemas) {
+ Iterator<ColumnSchema> iterator = columnSchemas.iterator();
+ while (iterator.hasNext()) {
+ ColumnSchema next = iterator.next();
+ if (null != next.getAggFunction() && next.getAggFunction().equalsIgnoreCase(aggFunction)) {
+ return next;
+ }
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Below method is to check if parent column with matching aggregate function
+ * @param parentColumnName
+ * parent column name
+ * @param aggFunction
+ * aggregate function
+ * @return is matching
+ */
+ public boolean isColumnWithAggFunctionExists(String parentColumnName, String aggFunction) {
+ Set<String> aggFunctions = parentColumnToAggregationsMapping.get(parentColumnName);
+ if (null != aggFunctions && aggFunctions.contains(aggFunction)) {
+ return true;
+ }
+ return false;
+ }
+
+
+ /**
+ * Method to prepare mapping of parent to list of aggregation function applied on that column
+ * @param listOfColumns
+ * child column schema list
+ */
+ private void fillParentNameToAggregationMapping(List<ColumnSchema> listOfColumns) {
+ parentColumnToAggregationsMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ column.getParentColumnTableRelations();
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+ String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+ Set<String> aggFunctions = parentColumnToAggregationsMapping.get(columnName);
+ if (null == aggFunctions) {
+ aggFunctions = new HashSet<>();
+ parentColumnToAggregationsMapping.put(columnName, aggFunctions);
+ }
+ aggFunctions.add(column.getAggFunction());
+ }
+ }
+ }
+ }
+
+ /**
+ * Below method will be used prepare mapping between parent column to non aggregation function
+ * columns
+ * @param listOfColumns
+ * list of child columns
+ */
+ private void fillNonAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+ parentToNonAggChildMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null == column.getAggFunction() || column.getAggFunction().isEmpty()) {
+ fillMappingDetails(column, parentToNonAggChildMapping);
+ }
+ }
+ }
+
+ private void fillMappingDetails(ColumnSchema column,
+ Map<String, Set<ColumnSchema>> map) {
+ List<ParentColumnTableRelation> parentColumnTableRelations =
+ column.getParentColumnTableRelations();
+ if (null != parentColumnTableRelations && parentColumnTableRelations.size() == 1) {
+ String columnName = column.getParentColumnTableRelations().get(0).getColumnName();
+ Set<ColumnSchema> columnSchemas = map.get(columnName);
+ if (null == columnSchemas) {
+ columnSchemas = new HashSet<>();
+ map.put(columnName, columnSchemas);
+ }
+ columnSchemas.add(column);
+ }
+ }
+
+ /**
+ * Below method will be used to fill parent to list of aggregation column mapping
+ * @param listOfColumns
+ * list of child columns
+ */
+ private void fillAggFunctionColumns(List<ColumnSchema> listOfColumns) {
+ parentToAggChildMapping = new HashMap<>();
+ for (ColumnSchema column : listOfColumns) {
+ if (null != column.getAggFunction() && !column.getAggFunction().isEmpty()) {
+ fillMappingDetails(column, parentToAggChildMapping);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index ca0952d..0fd9fbf 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -126,6 +126,8 @@ public class CarbonTable implements Serializable {
private int dimensionOrdinalMax;
+ private boolean hasDataMapSchema;
+
private CarbonTable() {
this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
this.tableImplicitDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -158,6 +160,8 @@ public class CarbonTable implements Serializable {
table.tablePartitionMap.put(tableInfo.getFactTable().getTableName(),
tableInfo.getFactTable().getPartitionInfo());
}
+ table.hasDataMapSchema =
+ null != tableInfo.getDataMapSchemaList() && tableInfo.getDataMapSchemaList().size() > 0;
return table;
}
@@ -702,13 +706,13 @@ public class CarbonTable implements Serializable {
this.dimensionOrdinalMax = dimensionOrdinalMax;
}
- public boolean isPreAggregateTable() {
- return tableInfo.getParentRelationIdentifiers() != null && !tableInfo
- .getParentRelationIdentifiers().isEmpty();
+
+ public boolean hasDataMapSchema() {
+ return hasDataMapSchema;
}
- public boolean hasPreAggregateTables() {
- return tableInfo.getDataMapSchemaList() != null && !tableInfo
- .getDataMapSchemaList().isEmpty();
+ public boolean isChildDataMap() {
+ return null != tableInfo.getParentRelationIdentifiers()
+ && !tableInfo.getParentRelationIdentifiers().isEmpty();
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index e0632d9..5a9017b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -30,20 +30,20 @@ public class DataMapSchema implements Serializable, Writable {
private static final long serialVersionUID = 6577149126264181553L;
- private String dataMapName;
+ protected String dataMapName;
private String className;
- private RelationIdentifier relationIdentifier;
+ protected RelationIdentifier relationIdentifier;
/**
* child table schema
*/
- private TableSchema childSchema;
+ protected TableSchema childSchema;
/**
* relation properties
*/
- private Map<String, String> properties;
+ protected Map<String, String> properties;
public DataMapSchema() {
}
@@ -69,6 +69,10 @@ public class DataMapSchema implements Serializable, Writable {
return properties;
}
+ public String getDataMapName() {
+ return dataMapName;
+ }
+
public void setRelationIdentifier(RelationIdentifier relationIdentifier) {
this.relationIdentifier = relationIdentifier;
}
@@ -81,10 +85,6 @@ public class DataMapSchema implements Serializable, Writable {
this.properties = properties;
}
- public String getDataMapName() {
- return dataMapName;
- }
-
@Override public void write(DataOutput out) throws IOException {
out.writeUTF(dataMapName);
out.writeUTF(className);
@@ -114,7 +114,7 @@ public class DataMapSchema implements Serializable, Writable {
this.className = in.readUTF();
boolean isRelationIdnentifierExists = in.readBoolean();
if (isRelationIdnentifierExists) {
- this.relationIdentifier = new RelationIdentifier();
+ this.relationIdentifier = new RelationIdentifier(null, null, null);
this.relationIdentifier.readFields(in);
}
boolean isChildSchemaExists = in.readBoolean();
@@ -130,6 +130,5 @@ public class DataMapSchema implements Serializable, Writable {
String value = in.readUTF();
this.properties.put(key, value);
}
-
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
new file mode 100644
index 0000000..5729959
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchemaFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.metadata.schema.table;
+
+import static org.apache.carbondata.core.constants.CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA;
+
+public class DataMapSchemaFactory {
+ public static final DataMapSchemaFactory INSTANCE = new DataMapSchemaFactory();
+
+ /**
+ * Below class will be used to get data map schema object
+ * based on class name
+ * @param className
+ * @return data map schema
+ */
+ public DataMapSchema getDataMapSchema(String dataMapName, String className) {
+ switch (className) {
+ case AGGREGATIONDATAMAPSCHEMA:
+ return new AggregationDataMapSchema(dataMapName, className);
+ default:
+ return new DataMapSchema(dataMapName, className);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 1d9e2ec..65878bc 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -295,7 +295,12 @@ public class TableInfo implements Serializable, Writable {
for (int i = 0; i < numberOfChildTable; i++) {
DataMapSchema childSchema = new DataMapSchema();
childSchema.readFields(in);
- dataMapSchemaList.add(childSchema);
+ DataMapSchema dataMapSchema = DataMapSchemaFactory.INSTANCE
+ .getDataMapSchema(childSchema.getDataMapName(), childSchema.getClassName());
+ dataMapSchema.setChildSchema(childSchema.getChildSchema());
+ dataMapSchema.setRelationIdentifier(childSchema.getRelationIdentifier());
+ dataMapSchema.setProperties(childSchema.getProperties());
+ dataMapSchemaList.add(dataMapSchema);
}
}
boolean isParentTableRelationIndentifierExists = in.readBoolean();
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
index 714e0d8..03848d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableSchema.java
@@ -263,9 +263,10 @@ public class TableSchema implements Serializable, Writable {
Map<String, String> properties = new HashMap<>();
properties.put("CHILD_SELECT QUERY", queryString);
properties.put("QUERYTYPE", queryType);
- DataMapSchema dataMapSchema = new DataMapSchema(dataMapName, className);
- dataMapSchema.setChildSchema(this);
+ DataMapSchema dataMapSchema =
+ new DataMapSchema(dataMapName, className);
dataMapSchema.setProperties(properties);
+ dataMapSchema.setChildSchema(this);
dataMapSchema.setRelationIdentifier(relationIdentifier);
return dataMapSchema;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
new file mode 100644
index 0000000..8b87a1a
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/AggregateTableSelector.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.AggregationDataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * Below class will be used to select the aggregate table based
+ * query plan. Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ */
+public class AggregateTableSelector {
+
+ /**
+ * current query plan
+ */
+ private QueryPlan queryPlan;
+
+ /**
+ * parent table
+ */
+ private CarbonTable parentTable;
+
+ public AggregateTableSelector(QueryPlan queryPlan, CarbonTable parentTable) {
+ this.queryPlan = queryPlan;
+ this.parentTable = parentTable;
+ }
+
+ /**
+ * Below method will be used to select pre aggregate tables based on query plan
+ * Rules for selecting the aggregate table is below:
+ * 1. Select all aggregate table based on projection
+ * 2. select aggregate table based on filter exp,
+ * 2. select if aggregate tables based on aggregate columns
+ *
+ * @return selected pre aggregate table schema
+ */
+ public List<DataMapSchema> selectPreAggDataMapSchema() {
+ List<QueryColumn> projectionColumn = queryPlan.getProjectionColumn();
+ List<QueryColumn> aggColumns = queryPlan.getAggregationColumns();
+ List<QueryColumn> filterColumns = queryPlan.getFilterColumns();
+ List<DataMapSchema> dataMapSchemaList = parentTable.getTableInfo().getDataMapSchemaList();
+ List<DataMapSchema> selectedDataMapSchema = new ArrayList<>();
+ boolean isMatch;
+ // match projection columns
+ if (null != projectionColumn && !projectionColumn.isEmpty()) {
+ for (DataMapSchema dmSchema : dataMapSchemaList) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ isMatch = true;
+ for (QueryColumn queryColumn : projectionColumn) {
+ ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+ .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ if (null == columnSchemaByParentName) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ // if projection column is present but selected table list size is zero then
+ if (selectedDataMapSchema.size() == 0) {
+ return selectedDataMapSchema;
+ }
+ }
+
+ // match filter columns
+ if (null != filterColumns && !filterColumns.isEmpty()) {
+ List<DataMapSchema> dmSchemaToIterate =
+ selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+ selectedDataMapSchema = new ArrayList<>();
+ for (DataMapSchema dmSchema : dmSchemaToIterate) {
+ isMatch = true;
+ for (QueryColumn queryColumn : filterColumns) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ ColumnSchema columnSchemaByParentName = aggregationDataMapSchema
+ .getNonAggChildColBasedByParent(queryColumn.getColumnSchema().getColumnName());
+ if (null == columnSchemaByParentName) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ // if filter column is present and selection size is zero then return
+ if (selectedDataMapSchema.size() == 0) {
+ return selectedDataMapSchema;
+ }
+ }
+ // match aggregation columns
+ if (null != aggColumns && !aggColumns.isEmpty()) {
+ List<DataMapSchema> dmSchemaToIterate =
+ selectedDataMapSchema.isEmpty() ? dataMapSchemaList : selectedDataMapSchema;
+ selectedDataMapSchema = new ArrayList<>();
+ for (DataMapSchema dmSchema : dmSchemaToIterate) {
+ isMatch = true;
+ for (QueryColumn queryColumn : aggColumns) {
+ AggregationDataMapSchema aggregationDataMapSchema = (AggregationDataMapSchema) dmSchema;
+ if (!aggregationDataMapSchema
+ .isColumnWithAggFunctionExists(queryColumn.getColumnSchema().getColumnName(),
+ queryColumn.getAggFunction())) {
+ isMatch = false;
+ }
+ }
+ if (isMatch) {
+ selectedDataMapSchema.add(dmSchema);
+ }
+ }
+ }
+ return selectedDataMapSchema;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
new file mode 100644
index 0000000..a62d556
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryColumn.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.preagg;
+
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+
+/**
+ * column present in query
+ */
+public class QueryColumn {
+
+ /**
+ * parent column schema
+ */
+ private ColumnSchema columnSchema;
+
+ /**
+ * to store the change data type in case of cast
+ */
+ private String changedDataType;
+
+ /**
+ * aggregation function applied
+ */
+ private String aggFunction;
+
+ /**
+ * is filter column
+ */
+ private boolean isFilterColumn;
+
+ public QueryColumn(ColumnSchema columnSchema, String changedDataType, String aggFunction,
+ boolean isFilterColumn) {
+ this.columnSchema = columnSchema;
+ this.changedDataType = changedDataType;
+ this.aggFunction = aggFunction;
+ this.isFilterColumn = isFilterColumn;
+ }
+
+ public ColumnSchema getColumnSchema() {
+ return columnSchema;
+ }
+
+ public String getChangedDataType() {
+ return changedDataType;
+ }
+
+ public String getAggFunction() {
+ return aggFunction;
+ }
+
+ public boolean isFilterColumn() {
+ return isFilterColumn;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
new file mode 100644
index 0000000..21a34fa
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/preagg/QueryPlan.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.core.preagg;
+
+import java.util.List;
+
+/**
+ * class to maintain the query plan to select the data map tables
+ */
+public class QueryPlan {
+
+ /**
+ * List of projection columns
+ */
+ private List<QueryColumn> projectionColumn;
+
+ /**
+ * list of aggregation columns
+ */
+ private List<QueryColumn> aggregationColumns;
+
+ /**
+ * list of filter columns
+ */
+ private List<QueryColumn> filterColumns;
+
+ public QueryPlan(List<QueryColumn> projectionColumn, List<QueryColumn> aggregationColumns,
+ List<QueryColumn> filterColumns) {
+ this.projectionColumn = projectionColumn;
+ this.aggregationColumns = aggregationColumns;
+ this.filterColumns = filterColumns;
+ }
+
+ public List<QueryColumn> getProjectionColumn() {
+ return projectionColumn;
+ }
+
+ public List<QueryColumn> getAggregationColumns() {
+ return aggregationColumns;
+ }
+
+ public List<QueryColumn> getFilterColumns() {
+ return filterColumns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
new file mode 100644
index 0000000..6b435c6
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.integration.spark.testsuite.preaTable1regate
+
+import org.apache.spark.sql.catalyst.analysis.UnresolvedAlias
+import org.apache.spark.sql.catalyst.expressions.Alias
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, DataFrame}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("drop table if exists agg0")
+ sql("drop table if exists agg1")
+ sql("drop table if exists agg2")
+ sql("drop table if exists agg3")
+ sql("drop table if exists agg4")
+ sql("drop table if exists agg5")
+ sql("drop table if exists agg6")
+ sql("drop table if exists agg7")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
+ sql("create datamap agg1 on table mainTable using 'preaggregate' as select name,sum(age) from mainTable group by name")
+ sql("create datamap agg2 on table mainTable using 'preaggregate' as select name,sum(id) from mainTable group by name")
+ sql("create datamap agg3 on table mainTable using 'preaggregate' as select name,count(id) from mainTable group by name")
+ sql("create datamap agg4 on table mainTable using 'preaggregate' as select name,sum(age),count(id) from mainTable group by name")
+ sql("create datamap agg5 on table mainTable using 'preaggregate' as select name,avg(age) from mainTable group by name")
+ sql("create datamap agg6 on table mainTable using 'preaggregate' as select name,min(age) from mainTable group by name")
+ sql("create datamap agg7 on table mainTable using 'preaggregate' as select name,max(age) from mainTable group by name")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/measureinsertintotest.csv' into table mainTable")
+ }
+
+
+ test("test PreAggregate table selection 1") {
+ val df = sql("select name from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+ test("test PreAggregate table selection 2") {
+ val df = sql("select name from mainTable where name in (select name from mainTable) group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 3") {
+ val df = sql("select name from mainTable where name in (select name from mainTable group by name) group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 4") {
+ val df = sql("select name from mainTable where name in('vishal') group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
+ }
+
+ test("test PreAggregate table selection 5") {
+ val df = sql("select name, sum(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 6") {
+ val df = sql("select sum(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 7") {
+ val df = sql("select sum(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg2")
+ }
+
+ test("test PreAggregate table selection 8") {
+ val df = sql("select count(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ }
+
+ test("test PreAggregate table selection 9") {
+ val df = sql("select sum(age), count(id) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg4")
+ }
+
+ test("test PreAggregate table selection 10") {
+ val df = sql("select avg(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+ }
+
+ test("test PreAggregate table selection 11") {
+ val df = sql("select max(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+ }
+
+ test("test PreAggregate table selection 12") {
+ val df = sql("select min(age) from mainTable group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+ }
+
+ test("test PreAggregate table selection 13") {
+ val df = sql("select name, sum(age) from mainTable where city = 'Bangalore' group by name")
+ preAggTableValidator(df.queryExecution.analyzed, "mainTable")
+ }
+
+ test("test PreAggregate table selection 14") {
+ val df = sql("select sum(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg1")
+ }
+
+ test("test PreAggregate table selection 15") {
+ val df = sql("select avg(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg5")
+ }
+
+ test("test PreAggregate table selection 16") {
+ val df = sql("select max(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg7")
+ }
+
+ test("test PreAggregate table selection 17") {
+ val df = sql("select min(age) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg6")
+ }
+
+ test("test PreAggregate table selection 18") {
+ val df = sql("select count(id) from mainTable")
+ preAggTableValidator(df.queryExecution.analyzed, "maintable_agg3")
+ }
+
+ def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
+ var isValidPlan = false
+ plan.transform {
+ // first check if any preaTable1 scala function is applied it is present is in plan
+ // then call is from create preaTable1regate table class so no need to transform the query plan
+ case logicalRelation:LogicalRelation =>
+ if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+ if(relation.carbonTable.getFactTableName.equalsIgnoreCase(actualTableName)) {
+ isValidPlan = true
+ }
+ }
+ logicalRelation
+ }
+ if(!isValidPlan) {
+ assert(false)
+ } else {
+ assert(true)
+ }
+ }
+
+ override def afterAll: Unit = {
+ sql("drop table if exists mainTable")
+ sql("drop table if exists agg0")
+ sql("drop table if exists agg1")
+ sql("drop table if exists agg2")
+ sql("drop table if exists agg3")
+ sql("drop table if exists agg4")
+ sql("drop table if exists agg5")
+ sql("drop table if exists agg6")
+ sql("drop table if exists agg7")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 42447da..e83d96a 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -173,6 +173,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val DATAMAP = carbonKeyWord("DATAMAP")
protected val ON = carbonKeyWord("ON")
protected val DMPROPERTIES = carbonKeyWord("DMPROPERTIES")
+ protected val SELECT = carbonKeyWord("SELECT")
protected val doubleQuotedString = "\"([^\"]+)\"".r
protected val singleQuotedString = "'([^']+)'".r
@@ -989,6 +990,12 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
Field(e1, e2.dataType, Some(e1), e2.children, null, e3)
}
+ lazy val addPreAgg: Parser[String] =
+ SELECT ~> restInput <~ opt(";") ^^ {
+ case query =>
+ "select preAGG() as preAgg, " + query
+ }
+
protected lazy val primitiveFieldType: Parser[Field] =
primitiveTypes ^^ {
case e1 =>
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9899be1..28dcbf2 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -463,7 +463,7 @@ object CarbonDataRDDFactory {
throw new Exception(status(0)._2._2.errorMsg)
}
// if segment is empty then fail the data load
- if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isPreAggregateTable &&
+ if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
// update the load entry in table status file for changing the status to failure
CommonUtil.updateTableStatusForFailure(carbonLoadModel)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index a37b55b..b69ef2f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -52,6 +52,9 @@ class CarbonEnv {
def init(sparkSession: SparkSession): Unit = {
sparkSession.udf.register("getTupleId", () => "")
+ // added for handling preaggregate table creation. when user will fire create ddl for
+ // create table we are adding a udf so no need to apply PreAggregate rules.
+ sparkSession.udf.register("preAgg", () => "")
if (!initialized) {
// update carbon session parameters , preserve thread parameters
val currentThreadSesssionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index ebf6273..3a78968 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -25,6 +25,8 @@ import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+
/**
* Below command class will be used to create pre-aggregate table
* and updating the parent table about the child table information
@@ -47,9 +49,10 @@ case class CreatePreAggregateTableCommand(
}
override def processSchema(sparkSession: SparkSession): Seq[Row] = {
- val df = sparkSession.sql(queryString)
- val fieldRelationMap = PreAggregateUtil
- .validateActualSelectPlanAndGetAttrubites(df.logicalPlan, queryString)
+ val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+ val df = sparkSession.sql(updatedQuery)
+ val fieldRelationMap = PreAggregateUtil.validateActualSelectPlanAndGetAttributes(
+ df.logicalPlan, queryString)
val fields = fieldRelationMap.keySet.toSeq
val tableProperties = mutable.Map[String, String]()
dmproperties.foreach(t => tableProperties.put(t._1, t._2))
@@ -87,7 +90,8 @@ case class CreatePreAggregateTableCommand(
val tableInfo = relation.tableMeta.carbonTable.getTableInfo
// child schema object which will be updated on parent table about the
val childSchema = tableInfo.getFactTable.buildChildSchema(
- dataMapName, "", tableInfo.getDatabaseName, queryString, "AGGREGATION")
+ dataMapName, CarbonCommonConstants.AGGREGATIONDATAMAPSCHEMA,
+ tableInfo.getDatabaseName, queryString, "AGGREGATION")
dmproperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
// updating the parent table about child table
PreAggregateUtil.updateMainTable(parentDbName, parentTableName, childSchema, sparkSession)
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index 8271e57..7a66e88 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -105,7 +105,7 @@ object PreAggregateDataTypeChangePreListener extends OperationEventListener {
}
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot change data type for columns in " +
s"pre-aggreagate table ${
carbonTable.getDatabaseName
@@ -126,12 +126,12 @@ object PreAggregateDeleteSegmentByDatePreListener extends OperationEventListener
val deleteSegmentByDatePreEvent = event.asInstanceOf[DeleteSegmentByDatePreEvent]
val carbonTable = deleteSegmentByDatePreEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table. " +
"Drop pre-aggregation table to continue")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
@@ -150,11 +150,11 @@ object PreAggregateDeleteSegmentByIdPreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteSegmentByIdPreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on tables which have a pre-aggregate table")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete segment operation is not supported on pre-aggregate table")
}
@@ -190,7 +190,7 @@ object PreAggregateDropColumnPreListener extends OperationEventListener {
s"pre-aggregate table ${ dataMapSchema.getRelationIdentifier.toString}")
}
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(s"Cannot drop columns in pre-aggreagate table ${
carbonTable.getDatabaseName}.${ carbonTable.getFactTableName }")
}
@@ -209,11 +209,11 @@ object PreAggregateRenameTablePreListener extends OperationEventListener {
operationContext: OperationContext): Unit = {
val renameTablePostListener = event.asInstanceOf[AlterTableRenamePreEvent]
val carbonTable = renameTablePostListener.carbonTable
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Rename operation for pre-aggregate table is not supported.")
}
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Rename operation is not supported for table with pre-aggregate tables")
}
@@ -231,12 +231,12 @@ object UpdatePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[UpdateTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Update operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Update operation is not supported for pre-aggregate table")
}
@@ -255,12 +255,12 @@ object DeletePreAggregatePreListener extends OperationEventListener {
val tableEvent = event.asInstanceOf[DeleteFromTablePreEvent]
val carbonTable = tableEvent.carbonTable
if (carbonTable != null) {
- if (carbonTable.hasPreAggregateTables) {
+ if (carbonTable.hasDataMapSchema) {
throw new UnsupportedOperationException(
"Delete operation is not supported for tables which have a pre-aggregate table. Drop " +
"pre-aggregate tables to continue.")
}
- if (carbonTable.isPreAggregateTable) {
+ if (carbonTable.isChildDataMap) {
throw new UnsupportedOperationException(
"Delete operation is not supported for pre-aggregate table")
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index b926705..62e7623 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -16,13 +16,14 @@
*/
package org.apache.spark.sql.execution.command.preaaggregate
-import scala.collection.mutable.ListBuffer
+import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
import org.apache.spark.SparkConf
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast}
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
+import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Cast, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
@@ -51,9 +52,14 @@ object PreAggregateUtil {
def getParentCarbonTable(plan: LogicalPlan): CarbonTable = {
plan match {
- case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
- if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.metaData.carbonTable
+ case Aggregate(_, _, SubqueryAlias(_, logicalRelation: LogicalRelation, _))
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+ carbonRelation.metaData.carbonTable
+ case Aggregate(_, _, logicalRelation: LogicalRelation)
+ if logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
+ logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation].
+ carbonRelation.metaData.carbonTable
case _ => throw new MalformedCarbonCommandException("table does not exist")
}
}
@@ -67,54 +73,86 @@ object PreAggregateUtil {
* @param selectStmt
* @return list of fields
*/
- def validateActualSelectPlanAndGetAttrubites(plan: LogicalPlan,
+ def validateActualSelectPlanAndGetAttributes(plan: LogicalPlan,
selectStmt: String): scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
- val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
plan match {
- case Aggregate(_, aExp, SubqueryAlias(_, l: LogicalRelation, _))
- if l.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- val carbonTable = l.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
- .metaData.carbonTable
- val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getTableName
- val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getDatabaseName
- val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
- .getTableId
- if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+ case Aggregate(groupByExp, aggExp, SubqueryAlias(_, logicalRelation: LogicalRelation, _)) =>
+ getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+ case Aggregate(groupByExp, aggExp, logicalRelation: LogicalRelation) =>
+ getFieldsFromPlan(groupByExp, aggExp, logicalRelation, selectStmt)
+ }
+ }
+
+ /**
+ * Below method will be used to get the fields from expressions
+ * @param groupByExp
+ * grouping expression
+ * @param aggExp
+ * aggregate expression
+ * @param logicalRelation
+ * logical relation
+ * @param selectStmt
+ * select statement
+ * @return fields from expressions
+ */
+ def getFieldsFromPlan(groupByExp: Seq[Expression],
+ aggExp: Seq[NamedExpression], logicalRelation: LogicalRelation, selectStmt: String):
+ scala.collection.mutable.LinkedHashMap[Field, DataMapField] = {
+ val fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, DataMapField]
+ if (!logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+ throw new MalformedCarbonCommandException("Un-supported table")
+ }
+ val carbonTable = logicalRelation.relation.
+ asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation
+ .metaData.carbonTable
+ val parentTableName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableName
+ val parentDatabaseName = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getDatabaseName
+ val parentTableId = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
+ .getTableId
+ if (!carbonTable.getTableInfo.getParentRelationIdentifiers.isEmpty) {
+ throw new MalformedCarbonCommandException(
+ "Pre Aggregation is not supported on Pre-Aggregated Table")
+ }
+ groupByExp.map {
+ case attr: AttributeReference =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case _ =>
+ throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${
+ selectStmt } ")
+ }
+ aggExp.map {
+ case Alias(attr: AggregateExpression, _) =>
+ if (attr.isDistinct) {
throw new MalformedCarbonCommandException(
- "Pre Aggregation is not supported on Pre-Aggregated Table")
- }
- aExp.map {
- case Alias(attr: AggregateExpression, _) =>
- if (attr.isDistinct) {
- throw new MalformedCarbonCommandException(
- "Distinct is not supported On Pre Aggregation")
- }
- fieldToDataMapFieldMap ++= (validateAggregateFunctionAndGetFields(carbonTable,
- attr.aggregateFunction,
- parentTableName,
- parentDatabaseName,
- parentTableId))
- case attr: AttributeReference =>
- fieldToDataMapFieldMap += getField(attr.name,
- attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- case Alias(attr: AttributeReference, _) =>
- fieldToDataMapFieldMap += getField(attr.name,
- attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- case _ =>
- throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
- selectStmt } ")
+ "Distinct is not supported On Pre Aggregation")
}
- Some(carbonTable)
+ fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+ attr.aggregateFunction,
+ parentTableName,
+ parentDatabaseName,
+ parentTableId)
+ case attr: AttributeReference =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case Alias(attr: AttributeReference, _) =>
+ fieldToDataMapFieldMap += getField(attr.name,
+ attr.dataType,
+ parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
case _ =>
- throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${ selectStmt } ")
+ throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
+ selectStmt } ")
}
fieldToDataMapFieldMap
}
@@ -347,30 +385,6 @@ object PreAggregateUtil {
}
/**
- * This method will split schema string into multiple parts of configured size and
- * registers the parts as keys in tableProperties which will be read by spark to prepare
- * Carbon Table fields
- *
- * @param sparkConf
- * @param schemaJsonString
- * @return
- */
- private def prepareSchemaJson(sparkConf: SparkConf,
- schemaJsonString: String): String = {
- val threshold = sparkConf
- .getInt(CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD,
- CarbonCommonConstants.SPARK_SCHEMA_STRING_LENGTH_THRESHOLD_DEFAULT)
- // Split the JSON string.
- val parts = schemaJsonString.grouped(threshold).toSeq
- var schemaParts: Seq[String] = Seq.empty
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_NUMPARTS'='${ parts.size }'"
- parts.zipWithIndex.foreach { case (part, index) =>
- schemaParts = schemaParts :+ s"'$DATASOURCE_SCHEMA_PART_PREFIX$index'='$part'"
- }
- schemaParts.mkString(",")
- }
-
- /**
* Validates that the table exists and acquires meta lock on it.
*
* @param dbName
@@ -453,4 +467,30 @@ object PreAggregateUtil {
def checkMainTableLoad(carbonTable: CarbonTable): Boolean = {
SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).nonEmpty
}
+
+ /**
+ * Below method will be used to update logical plan
+ * this is required for creating pre aggregate tables,
+ * so @CarbonPreAggregateRules will not be applied during creation
+ * @param logicalPlan
+ * actual logical plan
+ * @return updated plan
+ */
+ def updatePreAggQueyPlan(logicalPlan: LogicalPlan): LogicalPlan = {
+ val updatedPlan = logicalPlan.transform {
+ case _@Project(projectList, child) =>
+ val buffer = new ArrayBuffer[NamedExpression]()
+ buffer ++= projectList
+ buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+ Seq.empty, isDistinct = false), "preAgg")())
+ Project(buffer, child)
+ case Aggregate(groupByExp, aggExp, l: UnresolvedRelation) =>
+ val buffer = new ArrayBuffer[NamedExpression]()
+ buffer ++= aggExp
+ buffer += UnresolvedAlias(Alias(UnresolvedFunction("preAgg",
+ Seq.empty, isDistinct = false), "preAgg")())
+ Aggregate(groupByExp, buffer, l)
+ }
+ updatedPlan
+ }
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/dda2573a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index ba7e1eb..7bd0fad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -20,111 +20,12 @@ package org.apache.spark.sql.hive
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.CarbonTableIdentifierImplicit
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute, UnresolvedFunction, UnresolvedRelation, UnresolvedStar}
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, ExprId, NamedExpression}
-import org.apache.spark.sql.catalyst.expressions.aggregate.{DeclarativeAggregate, _}
+import org.apache.spark.sql.catalyst.expressions.Alias
import org.apache.spark.sql.catalyst.plans.Inner
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules._
import org.apache.spark.sql.execution.SparkSqlParser
import org.apache.spark.sql.execution.command.mutation.ProjectForDeleteCommand
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-
-
-/**
- * Insert into carbon table from other source
- */
-object CarbonPreInsertionCasts extends Rule[LogicalPlan] {
- def apply(plan: LogicalPlan): LogicalPlan = {
- plan.transform {
- // Wait until children are resolved.
- case p: LogicalPlan if !p.childrenResolved => p
-
- case p@InsertIntoTable(relation: LogicalRelation, _, child, _, _)
- if relation.relation.isInstanceOf[CarbonDatasourceHadoopRelation] =>
- castChildOutput(p, relation.relation.asInstanceOf[CarbonDatasourceHadoopRelation], child)
- }
- }
-
- def castChildOutput(p: InsertIntoTable,
- relation: CarbonDatasourceHadoopRelation,
- child: LogicalPlan)
- : LogicalPlan = {
- if (relation.carbonRelation.output.size > CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS) {
- sys
- .error("Maximum supported column by carbon is:" + CarbonCommonConstants
- .DEFAULT_MAX_NUMBER_OF_COLUMNS
- )
- }
- val isAggregateTable = !relation.carbonRelation.tableMeta.carbonTable.getTableInfo
- .getParentRelationIdentifiers.isEmpty
- // transform logical plan if the load is for aggregate table.
- val childPlan = if (isAggregateTable) {
- transformAggregatePlan(child)
- } else {
- child
- }
- if (childPlan.output.size >= relation.carbonRelation.output.size) {
- val newChildOutput = childPlan.output.zipWithIndex.map { columnWithIndex =>
- columnWithIndex._1 match {
- case attr: Alias =>
- Alias(attr.child, s"col${ columnWithIndex._2 }")(attr.exprId)
- case attr: Attribute =>
- Alias(attr, s"col${ columnWithIndex._2 }")(NamedExpression.newExprId)
- case attr => attr
- }
- }
- val newChild: LogicalPlan = if (newChildOutput == childPlan.output) {
- p.child
- } else {
- Project(newChildOutput, childPlan)
- }
- InsertIntoCarbonTable(relation, p.partition, newChild, p.overwrite, p.ifNotExists)
- } else {
- sys.error("Cannot insert into target table because column number are different")
- }
- }
-
- /**
- * Transform the logical plan with average(col1) aggregation type to sum(col1) and count(col1).
- *
- * @param logicalPlan
- * @return
- */
- private def transformAggregatePlan(logicalPlan: LogicalPlan): LogicalPlan = {
- logicalPlan transform {
- case aggregate@Aggregate(_, aExp, _) =>
- val newExpressions = aExp flatMap {
- case alias@Alias(attrExpression: AggregateExpression, _) =>
- attrExpression.aggregateFunction flatMap {
- case Average(attr: AttributeReference) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId),
- attr.name)(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId), attr.name)())
- case Average(Cast(attr: AttributeReference, _)) =>
- Seq(Alias(attrExpression
- .copy(aggregateFunction = Sum(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId),
- attr.name)(),
- Alias(attrExpression
- .copy(aggregateFunction = Count(attr.withName(attr.name)),
- resultId = NamedExpression.newExprId), attr.name)())
- case _: DeclarativeAggregate => Seq(alias)
- case _ => Nil
- }
- case namedExpr: NamedExpression => Seq(namedExpr)
- }
- aggregate.copy(aggregateExpressions = newExpressions)
- case plan: LogicalPlan => plan
- }
- }
-}
case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {