You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/29 14:39:57 UTC
carbondata git commit: [CARBONDATA-1925][Pre-Aggregate]Added code to
support case expression
Repository: carbondata
Updated Branches:
refs/heads/master c5e72a4c9 -> adb8c1356
[CARBONDATA-1925][Pre-Aggregate]Added code to support case expression
Added code to support expression inside aggregation function for pre-aggregate table
This closes #1694
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/adb8c135
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/adb8c135
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/adb8c135
Branch: refs/heads/master
Commit: adb8c1356d0b753ecefe132cf5193ea3f2f92dea
Parents: c5e72a4
Author: kumarvishal <ku...@gmail.com>
Authored: Wed Dec 20 15:46:02 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Dec 29 22:39:44 2017 +0800
----------------------------------------------------------------------
.../schema/table/AggregationDataMapSchema.java | 4 +
.../TestPreAggregateExpressions.scala | 102 ++++++
.../command/carbonTableSchemaCommon.scala | 50 ++-
.../apache/spark/sql/CarbonExpressions.scala | 13 +
.../preaaggregate/PreAggregateUtil.scala | 313 ++++++++++++-------
.../command/timeseries/TimeseriesUtil.scala | 8 +-
.../sql/hive/CarbonPreAggregateRules.scala | 8 +-
7 files changed, 364 insertions(+), 134 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
index 8f6a2d3..4b2d492 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/AggregationDataMapSchema.java
@@ -329,6 +329,10 @@ public class AggregationDataMapSchema extends DataMapSchema {
return false;
}
}
+ } else {
+ // in case of any expression one column can be derived from multiple column
+ // in that case we cannot do rollup so hit the maintable
+ return false;
}
}
return true;
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
new file mode 100644
index 0000000..4171690
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.preaggregate
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll: Unit = {
+ sql("DROP TABLE IF EXISTS mainTable")
+ sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
+ }
+ test("test pre agg create table with expression 1") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg0 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | count(age)
+ | FROM mainTable GROUP BY name
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg0"), true, "maintable_age_count")
+ }
+
+ test("test pre agg create table with expression 2") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg1 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | sum(CASE WHEN age=35 THEN id ELSE 0 END)
+ | FROM mainTable GROUP BY name
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg1"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 3") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg2 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | sum(CASE WHEN age=35 THEN id ELSE 0 END),
+ | city
+ | FROM mainTable GROUP BY name,city
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg2"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 4") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg3 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | sum(CASE WHEN age=27 THEN id ELSE 0 END)
+ | FROM mainTable GROUP BY name
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg3"), true, "maintable_column_0_sum")
+ }
+
+ test("test pre agg create table with expression 5") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg4 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | sum(CASE WHEN age=27 THEN id ELSE 0 END),
+ | SUM(CASE WHEN age=35 THEN id ELSE 0 END)
+ | FROM mainTable GROUP BY name
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_0_sum")
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg4"), true, "maintable_column_1_sum")
+ }
+
+ test("test pre agg create table with expression 6") {
+ sql(
+ s"""
+ | CREATE DATAMAP agg5 ON TABLE mainTable USING 'preaggregate' AS
+ | SELECT name,
+ | COUNT(CASE WHEN age=27 THEN(CASE WHEN name='eason' THEN id ELSE 0 END) ELSE 0 END)
+ | FROM mainTable GROUP BY name
+ | """.stripMargin)
+ checkExistence(sql("DESCRIBE FORMATTED mainTable_agg5"), true, "maintable_column_0_count")
+ }
+
+ override def afterAll: Unit = {
+ sql("DROP TABLE IF EXISTS mainTable")
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index c7a7b69..1e368cf 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -79,7 +79,7 @@ case class Field(column: String, var dataType: Option[String], name: Option[Stri
}
case class DataMapField(var aggregateFunction: String = "",
- columnTableRelation: Option[ColumnTableRelation] = None) {
+ columnTableRelationList: Option[Seq[ColumnTableRelation]] = None) {
}
case class ColumnTableRelation(parentColumnName: String, parentColumnId: String,
@@ -435,14 +435,21 @@ class TableNewProcessor(cm: TableModel) {
if(isParentColumnRelation) {
val dataMapField = map.get.get(field).get
columnSchema.setFunction(dataMapField.aggregateFunction)
- val relation = dataMapField.columnTableRelation.get
- val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
- val relationIdentifier = new RelationIdentifier(
- relation.parentDatabaseName, relation.parentTableName, relation.parentTableId)
- val parentColumnTableRelation = new ParentColumnTableRelation(
- relationIdentifier, relation.parentColumnId, relation.parentColumnName)
- parentColumnTableRelationList.add(parentColumnTableRelation)
- columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
+ val columnRelationList = dataMapField.columnTableRelationList.get
+ val parentColumnTableRelationList = new util.ArrayList[ParentColumnTableRelation]
+ columnRelationList.foreach {
+ columnRelation =>
+ val relationIdentifier = new RelationIdentifier(
+ columnRelation.parentDatabaseName,
+ columnRelation.parentTableName,
+ columnRelation.parentTableId)
+ val parentColumnTableRelation = new ParentColumnTableRelation(
+ relationIdentifier,
+ columnRelation.parentColumnId,
+ columnRelation.parentColumnName)
+ parentColumnTableRelationList.add(parentColumnTableRelation)
+ }
+ columnSchema.setParentColumnTableRelations(parentColumnTableRelationList)
}
// TODO: Need to fill RowGroupID, converted type
// & Number of Children after DDL finalization
@@ -467,10 +474,11 @@ class TableNewProcessor(cm: TableModel) {
// Sort columns should be at the begin of all columns
cm.sortKeyDims.get.foreach { keyDim =>
val field = cm.dimCols.find(keyDim equals _.column).get
- val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+ val encoders = if (getEncoderFromParent(field)) {
cm.parentTable.get.getColumnByName(
cm.parentTable.get.getTableName,
- cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName).getEncoder
+ cm.dataMapRelation.get.get(field).get.columnTableRelationList.
+ get(0).parentColumnName).getEncoder
} else {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
@@ -491,12 +499,11 @@ class TableNewProcessor(cm: TableModel) {
cm.dimCols.foreach { field =>
val sortField = cm.sortKeyDims.get.find(field.column equals _)
if (sortField.isEmpty) {
- val encoders = if (cm.parentTable.isDefined &&
- cm.dataMapRelation.get.get(field).isDefined) {
+ val encoders = if (getEncoderFromParent(field)) {
cm.parentTable.get.getColumnByName(
cm.parentTable.get.getTableName,
cm.dataMapRelation.get.get(field).get.
- columnTableRelation.get.parentColumnName).getEncoder
+ columnTableRelationList.get(0).parentColumnName).getEncoder
} else {
val encoders = new java.util.ArrayList[Encoding]()
encoders.add(Encoding.DICTIONARY)
@@ -524,14 +531,14 @@ class TableNewProcessor(cm: TableModel) {
var isAggFunPresent = false
// getting the encoder from maintable so whatever encoding is applied in maintable
// same encoder can be applied on aggregate table
- val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+ val encoders = if (getEncoderFromParent(field)) {
isAggFunPresent =
cm.dataMapRelation.get.get(field).get.aggregateFunction.equalsIgnoreCase("sum") ||
cm.dataMapRelation.get.get(field).get.aggregateFunction.equals("avg")
if(!isAggFunPresent) {
cm.parentTable.get.getColumnByName(
cm.parentTable.get.getTableName,
- cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName)
+ cm.dataMapRelation.get.get(field).get.columnTableRelationList.get(0).parentColumnName)
.getEncoder
} else {
new java.util.ArrayList[Encoding]()
@@ -668,6 +675,17 @@ class TableNewProcessor(cm: TableModel) {
tableInfo
}
+ /**
+ * Method to check to get the encoder from parent or not
+ * @param field column field
+ * @return get encoder from parent
+ */
+ private def getEncoderFromParent(field: Field) : Boolean = {
+ cm.parentTable.isDefined &&
+ cm.dataMapRelation.get.get(field).isDefined &&
+ cm.dataMapRelation.get.get(field).get.columnTableRelationList.size==1
+ }
+
// For checking if the specified col group columns are specified in fields list.
protected def checkColGroupsValidity(colGrps: Seq[String],
allCols: Seq[ColumnSchema],
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
index c1f9e8a..d473bc4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonExpressions.scala
@@ -47,6 +47,19 @@ object CarbonExpressions {
}
/**
+ * unapply method of Cast class with expression.
+ */
+ object MatchCastExpression {
+ def unapply(expr: Expression): Option[(Expression, DataType)] = {
+ expr match {
+ case a: Cast if a.child.isInstanceOf[Expression] =>
+ Some((a.child.asInstanceOf[Expression], a.dataType))
+ case _ => None
+ }
+ }
+ }
+
+ /**
* unapply method of Describe Table format.
*/
object CarbonDescribeTable {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 1f5bd41..217436d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -19,20 +19,19 @@ package org.apache.spark.sql.execution.command.preaaggregate
import scala.collection.mutable.{ArrayBuffer, ListBuffer}
import scala.collection.JavaConverters._
-import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, DataFrame, SparkSession}
-import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
+import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSession, SparkSession}
+import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias, MatchCast => Cast, MatchCastExpression}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedFunction, UnresolvedRelation}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, NamedExpression, ScalaUDF}
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command.{ColumnTableRelation, DataMapField, Field}
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.hive.CarbonRelation
-import org.apache.spark.sql.types.DataType
-import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
-import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.types.DataType
import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -116,29 +115,45 @@ object PreAggregateUtil {
throw new MalformedCarbonCommandException(
"Pre Aggregation is not supported on Pre-Aggregated Table")
}
+ var counter = 0
aggExp.map {
- case Alias(attr: AggregateExpression, _) =>
+ case Alias(attr: AggregateExpression, name) =>
if (attr.isDistinct) {
throw new MalformedCarbonCommandException(
"Distinct is not supported On Pre Aggregation")
}
- fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(carbonTable,
+ fieldToDataMapFieldMap ++= validateAggregateFunctionAndGetFields(
+ carbonTable,
attr.aggregateFunction,
parentTableName,
parentDatabaseName,
- parentTableId)
+ parentTableId,
+ "column_" + counter)
+ counter = counter + 1
case attr: AttributeReference =>
- fieldToDataMapFieldMap += getField(attr.name,
+ val columnRelation = getColumnRelation(
+ attr.name,
+ parentTableId,
+ parentTableName,
+ parentDatabaseName,
+ carbonTable)
+ fieldToDataMapFieldMap += createField(
+ attr.name,
attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ columnTableRelationList = Seq(columnRelation))
case Alias(attr: AttributeReference, _) =>
- fieldToDataMapFieldMap += getField(attr.name,
+ val columnRelation = getColumnRelation(
+ attr.name,
+ parentTableId,
+ parentTableName,
+ parentDatabaseName,
+ carbonTable)
+ fieldToDataMapFieldMap += createField(
+ attr.name,
attr.dataType,
- parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ columnTableRelationList = Seq(columnRelation))
case _@Alias(s: ScalaUDF, name) if name.equals("preAgg") =>
case _ =>
throw new MalformedCarbonCommandException(s"Unsupported Select Statement:${
@@ -148,6 +163,34 @@ object PreAggregateUtil {
}
/**
+ * Below method will be used to get the column relation
+ * with the parent column which will be used during query and data loading
+ * @param parentColumnName
+ * parent column name
+ * @param parentTableId
+ * parent column id
+ * @param parentTableName
+ * parent table name
+ * @param parentDatabaseName
+ * parent database name
+ * @param carbonTable
+ * carbon table
+ * @return column relation object
+ */
+ def getColumnRelation(parentColumnName: String,
+ parentTableId: String,
+ parentTableName: String,
+ parentDatabaseName: String,
+ carbonTable: CarbonTable) : ColumnTableRelation = {
+ val parentColumnId = carbonTable.getColumnByName(parentTableName, parentColumnName).getColumnId
+ val columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName,
+ parentColumnId = parentColumnId,
+ parentTableName = parentTableName,
+ parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
+ columnTableRelation
+ }
+
+ /**
* Below method will be used to validate about the aggregate function
* which is applied on select query.
* Currently sum, max, min, count, avg is supported
@@ -155,102 +198,105 @@ object PreAggregateUtil {
* In case of avg it will return two fields one for count
* and other of sum of that column to support rollup
*
- * @param carbonTable
- * @param aggFunctions
- * @param parentTableName
- * @param parentDatabaseName
- * @param parentTableId
+ * @param carbonTable parent carbon table
+ * @param aggFunctions aggregation function
+ * @param parentTableName parent table name
+ * @param parentDatabaseName parent database name
+ * @param parentTableId parent column id
+ * @param newColumnName
+ * In case of any expression this will be used as a column name for pre aggregate
* @return list of fields
*/
def validateAggregateFunctionAndGetFields(carbonTable: CarbonTable,
aggFunctions: AggregateFunction,
parentTableName: String,
parentDatabaseName: String,
- parentTableId: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
+ parentTableId: String,
+ newColumnName: String) : scala.collection.mutable.ListBuffer[(Field, DataMapField)] = {
val list = scala.collection.mutable.ListBuffer.empty[(Field, DataMapField)]
aggFunctions match {
- case sum@Sum(attr: AttributeReference) =>
- list += getField(attr.name,
- attr.dataType,
- sum.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case sum@Sum(Cast(attr: AttributeReference, changeDataType: DataType)) =>
- list += getField(attr.name,
+ case sum@Sum(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+ list += createFieldForAggregateExpression(
+ exp,
changeDataType,
- sum.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case count@Count(Seq(attr: AttributeReference)) =>
- list += getField(attr.name,
- attr.dataType,
- count.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case count@Count(Seq(Cast(attr: AttributeReference, _))) =>
- list += getField(attr.name,
- attr.dataType,
- count.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case min@Min(attr: AttributeReference) =>
- list += getField(attr.name,
- attr.dataType,
- min.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case min@Min(Cast(attr: AttributeReference, changeDataType: DataType)) =>
- list += getField(attr.name,
+ carbonTable,
+ newColumnName,
+ sum.prettyName)
+ case sum@Sum(exp: Expression) =>
+ list += createFieldForAggregateExpression(
+ exp,
+ sum.dataType,
+ carbonTable,
+ newColumnName,
+ sum.prettyName)
+ case count@Count(Seq(MatchCastExpression(exp: Expression, changeDataType: DataType))) =>
+ list += createFieldForAggregateExpression(
+ exp,
changeDataType,
- min.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case max@Max(attr: AttributeReference) =>
- list += getField(attr.name,
- attr.dataType,
- max.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case max@Max(Cast(attr: AttributeReference, changeDataType: DataType)) =>
- list += getField(attr.name,
+ carbonTable,
+ newColumnName,
+ count.prettyName)
+ case count@Count(Seq(exp: Expression)) =>
+ list += createFieldForAggregateExpression(
+ exp,
+ count.dataType,
+ carbonTable,
+ newColumnName,
+ count.prettyName)
+ case min@Min(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+ list += createFieldForAggregateExpression(
+ exp,
changeDataType,
- max.prettyName,
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case Average(attr: AttributeReference) =>
- list += getField(attr.name,
- attr.dataType,
- "sum",
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- list += getField(attr.name,
- attr.dataType,
- "count",
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- case Average(Cast(attr: AttributeReference, changeDataType: DataType)) =>
- list += getField(attr.name,
+ carbonTable,
+ newColumnName,
+ min.prettyName)
+ case min@Min(exp: Expression) =>
+ list += createFieldForAggregateExpression(
+ exp,
+ min.dataType,
+ carbonTable,
+ newColumnName,
+ min.prettyName)
+ case max@Max(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+ list += createFieldForAggregateExpression(
+ exp,
changeDataType,
- "sum",
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
- list += getField(attr.name,
+ carbonTable,
+ newColumnName,
+ max.prettyName)
+ case max@Max(exp: Expression) =>
+ list += createFieldForAggregateExpression(
+ exp,
+ max.dataType,
+ carbonTable,
+ newColumnName,
+ max.prettyName)
+ case Average(MatchCastExpression(exp: Expression, changeDataType: DataType)) =>
+ list += createFieldForAggregateExpression(
+ exp,
changeDataType,
- "count",
- carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
- parentTableName,
- parentDatabaseName, parentTableId = parentTableId)
+ carbonTable,
+ newColumnName,
+ "sum")
+ list += createFieldForAggregateExpression(
+ exp,
+ changeDataType,
+ carbonTable,
+ newColumnName,
+ "count")
+ case avg@Average(exp: Expression) =>
+ list += createFieldForAggregateExpression(
+ exp,
+ avg.dataType,
+ carbonTable,
+ newColumnName,
+ "sum")
+ list += createFieldForAggregateExpression(
+ exp,
+ avg.dataType,
+ carbonTable,
+ newColumnName,
+ "count")
case others@_ =>
throw new MalformedCarbonCommandException(s"Un-Supported Aggregation Type: ${
others.prettyName}")
@@ -258,35 +304,79 @@ object PreAggregateUtil {
}
/**
+ * Below method will be used to get the field and its data map field object
+ * for aggregate expression
+ * @param expression
+ * expression in aggregate function
+ * @param dataType
+ * data type
+ * @param carbonTable
+ * parent carbon table
+ * @param newColumnName
+ * column name of aggregate table
+ * @param aggregationName
+ * aggregate function name
+ * @return field and its metadata tuple
+ */
+ def createFieldForAggregateExpression(
+ expression: Expression,
+ dataType: DataType,
+ carbonTable: CarbonTable,
+ newColumnName: String,
+ aggregationName: String): (Field, DataMapField) = {
+ val parentColumnsName = new ArrayBuffer[String]()
+ expression.transform {
+ case attr: AttributeReference =>
+ parentColumnsName += attr.name
+ attr
+ }
+ val arrayBuffer = parentColumnsName.map { name =>
+ getColumnRelation(name,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+ carbonTable)
+ }
+ // if parent column relation is of size more than one that means aggregate table
+ // column is derived from multiple column of main table
+ // or if expression is not a instance of attribute reference
+ // then use column name which is passed
+ val columnName =
+ if (parentColumnsName.size > 1 && !expression.isInstanceOf[AttributeReference]) {
+ newColumnName
+ } else {
+ expression.asInstanceOf[AttributeReference].name
+ }
+ createField(columnName,
+ dataType,
+ aggregationName,
+ carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+ arrayBuffer)
+ }
+
+ /**
* Below method will be used to get the fields object for pre aggregate table
*
* @param columnName
* @param dataType
* @param aggregateType
- * @param parentColumnId
* @param parentTableName
- * @param parentDatabaseName
- * @param parentTableId
+ * @param columnTableRelationList
+ * List of column relation with parent
* @return fields object
*/
- def getField(columnName: String,
+ def createField(columnName: String,
dataType: DataType,
aggregateType: String = "",
- parentColumnId: String,
parentTableName: String,
- parentDatabaseName: String,
- parentTableId: String): (Field, DataMapField) = {
+ columnTableRelationList: Seq[ColumnTableRelation]): (Field, DataMapField) = {
val actualColumnName = if (aggregateType.equals("")) {
parentTableName + '_' + columnName
} else {
parentTableName + '_' + columnName + '_' + aggregateType
}
val rawSchema = '`' + actualColumnName + '`' + ' ' + dataType.typeName
- val columnTableRelation = ColumnTableRelation(parentColumnName = columnName,
- parentColumnId = parentColumnId,
- parentTableName = parentTableName,
- parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
- val dataMapField = DataMapField(aggregateType, Some(columnTableRelation))
+ val dataMapField = DataMapField(aggregateType, Some(columnTableRelationList))
if (dataType.typeName.startsWith("decimal")) {
val (precision, scale) = CommonUtil.getScaleAndPrecision(dataType.catalogString)
(Field(column = actualColumnName,
@@ -508,7 +598,10 @@ object PreAggregateUtil {
val headers = dataMapSchemas.find(_.getChildSchema.getTableName.equalsIgnoreCase(
dataMapIdentifier.table)) match {
case Some(dataMapSchema) =>
- dataMapSchema.getChildSchema.getListOfColumns.asScala.sortBy(_.getSchemaOrdinal).map(
+ val columns = dataMapSchema.getChildSchema.getListOfColumns.asScala
+ .filter{column =>
+ !column.getColumnName.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)}
+ columns.sortBy(_.getSchemaOrdinal).map(
_.getColumnName).mkString(",")
case None =>
throw new RuntimeException(
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
index 6a4ef56..d4358b6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/timeseries/TimeseriesUtil.scala
@@ -111,8 +111,8 @@ object TimeSeriesUtil {
.LinkedHashMap[Field, DataMapField],
timeSeriesColumn: String) : Any = {
val isTimeSeriesColumnExits = fieldMapping
- .exists(obj => obj._2.columnTableRelation.isDefined &&
- obj._2.columnTableRelation.get.parentColumnName
+ .exists(obj => obj._2.columnTableRelationList.isDefined &&
+ obj._2.columnTableRelationList.get(0).parentColumnName
.equalsIgnoreCase(timeSeriesColumn) &&
obj._2.aggregateFunction.isEmpty)
if(!isTimeSeriesColumnExits) {
@@ -134,8 +134,8 @@ object TimeSeriesUtil {
timeSeriesColumn: String,
timeSeriesFunction: String) : Any = {
val isTimeSeriesColumnExits = fieldMapping
- .find(obj => obj._2.columnTableRelation.isDefined &&
- obj._2.columnTableRelation.get.parentColumnName
+ .find(obj => obj._2.columnTableRelationList.isDefined &&
+ obj._2.columnTableRelationList.get(0).parentColumnName
.equalsIgnoreCase(timeSeriesColumn) &&
obj._2.aggregateFunction.isEmpty)
isTimeSeriesColumnExits.get._2.aggregateFunction = timeSeriesFunction
http://git-wip-us.apache.org/repos/asf/carbondata/blob/adb8c135/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 76c39a4..79561c6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.SPARK_VERSION
import org.apache.spark.sql._
-import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast}
+import org.apache.spark.sql.CarbonExpressions.{CarbonScalaUDF, CarbonSubqueryAlias, MatchCast, MatchCastExpression}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
@@ -1290,11 +1290,11 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
attrExpression.aggregateFunction match {
case Sum(attr: AttributeReference) =>
(attr.name + "_sum", alias) :: Nil
- case Sum(MatchCast(attr: AttributeReference, _)) =>
+ case Sum(MatchCastExpression(attr: AttributeReference, _)) =>
(attr.name + "_sum", alias) :: Nil
case Count(Seq(attr: AttributeReference)) =>
(attr.name + "_count", alias) :: Nil
- case Count(Seq(MatchCast(attr: AttributeReference, _))) =>
+ case Count(Seq(MatchCastExpression(attr: AttributeReference, _))) =>
(attr.name + "_count", alias) :: Nil
case Average(attr: AttributeReference) =>
Seq((attr.name + "_sum", Alias(attrExpression.
@@ -1303,7 +1303,7 @@ object CarbonPreAggregateDataLoadingRules extends Rule[LogicalPlan] {
(attr.name, Alias(attrExpression.
copy(aggregateFunction = Count(attr),
resultId = NamedExpression.newExprId), attr.name + "_count")()))
- case Average(cast@MatchCast(attr: AttributeReference, _)) =>
+ case Average(cast@MatchCastExpression(attr: AttributeReference, _)) =>
Seq((attr.name + "_sum", Alias(attrExpression.
copy(aggregateFunction = Sum(cast),
resultId = NamedExpression.newExprId),