You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:41 UTC
[28/50] [abbrv] carbondata git commit: [CARBONDATA-2474] Support
Modular Plan for Materialized View DataMap
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
new file mode 100644
index 0000000..6363089
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/BirdcageOptimizer.scala
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.analysis._
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.optimizer._
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
+import org.apache.spark.sql.catalyst.rules.{RuleExecutor, _}
+import org.apache.spark.sql.internal.SQLConf
+
+object BirdcageOptimizer extends RuleExecutor[LogicalPlan] {
+
+ val conf = new SQLConf()
+ .copy(SQLConf.CASE_SENSITIVE -> true, SQLConf.STARSCHEMA_DETECTION -> true)
+ protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
+ def batches: Seq[Batch] = {
+ // Technically some of the rules in Finish Analysis are not optimizer rules and belong more
+ // in the analyzer, because they are needed for correctness (e.g. ComputeCurrentTime).
+ // However, because we also use the analyzer to canonicalized queries (for view definition),
+ // we do not eliminate subqueries or compute current time in the analyzer.
+ Batch(
+ "Finish Analysis", Once,
+ EliminateSubqueryAliases,
+ EliminateView,
+ ReplaceExpressions,
+ ComputeCurrentTime,
+ // GetCurrentDatabase(sessionCatalog),
+ RewriteDistinctAggregates,
+ ReplaceDeduplicateWithAggregate) ::
+ //////////////////////////////////////////////////////////////////////////////////////////
+ // Optimizer rules start here
+ //////////////////////////////////////////////////////////////////////////////////////////
+ // - Do the first call of CombineUnions before starting the major Optimizer rules,
+ // since it can reduce the number of iteration and the other rules could add/move
+ // extra operators between two adjacent Union operators.
+ // - Call CombineUnions again in Batch("Operator Optimizations"),
+ // since the other rules might make two separate Unions operators adjacent.
+ Batch(
+ "Union", Once,
+ CombineUnions) ::
+ Batch(
+ "Pullup Correlated Expressions", Once,
+ PullupCorrelatedPredicates) ::
+ Batch(
+ "Subquery", Once,
+ OptimizeSubqueries) ::
+ Batch(
+ "Replace Operators", fixedPoint,
+ ReplaceIntersectWithSemiJoin,
+ ReplaceExceptWithAntiJoin,
+ ReplaceDistinctWithAggregate) ::
+ Batch(
+ "Aggregate", fixedPoint,
+ RemoveLiteralFromGroupExpressions,
+ RemoveRepetitionFromGroupExpressions) ::
+ Batch(
+ "Operator Optimizations", fixedPoint, Seq(
+ // Operator push down
+ PushProjectionThroughUnion,
+ ReorderJoin(conf),
+ EliminateOuterJoin(conf),
+ PushPredicateThroughJoin,
+ PushDownPredicate,
+ // LimitPushDown(conf),
+ ColumnPruning,
+ // InferFiltersFromConstraints(conf),
+ // Operator combine
+ CollapseRepartition,
+ CollapseProject,
+ CollapseWindow,
+ CombineFilters,
+ CombineLimits,
+ CombineUnions,
+ // Constant folding and strength reduction
+ NullPropagation(conf),
+ FoldablePropagation,
+ // OptimizeIn(conf),
+ ConstantFolding,
+ ReorderAssociativeOperator,
+ LikeSimplification,
+ BooleanSimplification,
+ SimplifyConditionals,
+ RemoveDispensableExpressions,
+ SimplifyBinaryComparison,
+ // PruneFilters(conf),
+ EliminateSorts,
+ SimplifyCasts,
+ SimplifyCaseConversionExpressions,
+ RewriteCorrelatedScalarSubquery,
+ EliminateSerialization,
+ RemoveRedundantAliases,
+ RemoveRedundantProject,
+ SimplifyCreateStructOps,
+ SimplifyCreateArrayOps,
+ SimplifyCreateMapOps) ++
+ extendedOperatorOptimizationRules: _*) ::
+ Batch(
+ "Check Cartesian Products", Once,
+ CheckCartesianProducts(conf)) ::
+ // Batch("Join Reorder", Once,
+ // CostBasedJoinReorder(conf)) ::
+ // Batch("Decimal Optimizations", fixedPoint,
+ // DecimalAggregates(conf)) ::
+ Batch(
+ "Object Expressions Optimization", fixedPoint,
+ EliminateMapObjects,
+ CombineTypedFilters) ::
+ // Batch("LocalRelation", fixedPoint,
+ // ConvertToLocalRelation,
+ // PropagateEmptyRelation) ::
+ Batch(
+ "OptimizeCodegen", Once,
+ OptimizeCodegen(conf)) ::
+ Batch(
+ "RewriteSubquery", Once,
+ RewritePredicateSubquery,
+ CollapseProject) :: Nil
+ }
+
+ /**
+ * Optimize all the subqueries inside expression.
+ */
+ object OptimizeSubqueries extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = {
+ plan transformAllExpressions {
+ case s: SubqueryExpression =>
+ val Subquery(newPlan) = BirdcageOptimizer.this.execute(Subquery(s.plan))
+ s.withNewPlan(newPlan)
+ }
+ }
+ }
+
+ /**
+ * Override to provide additional rules for the operator optimization batch.
+ */
+ def extendedOperatorOptimizationRules: Seq[Rule[LogicalPlan]] = {
+ Nil
+ }
+}
+
+/**
+ * Push Aggregate through join to fact table.
+ * Pushes down [[Aggregate]] operators where the `grouping` and `aggregate` expressions can
+ * be evaluated using only the attributes of the fact table, the left or right side of a
+ * star-join.
+ * Other [[Aggregate]] expressions stay in the original [[Aggregate]].
+ *
+ * Check 'Aggregate Pushdown Over Join: Design & Preliminary Results' by LiTao for more details
+ */
+// case class PushAggregateThroughJoin(conf: SQLConf) extends Rule[LogicalPlan] with
+// PredicateHelper {
+//
+// val tableCluster = {
+// val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+// val tableClusterString = conf.getConfString("spark.mv.tableCluster")
+// mapper.readValue(tableClusterString, classOf[TableCluster])
+// }
+//
+// def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+//
+// // Push down aggregate expressions through Join
+// case a @ Aggregate(grouping, aggregate, Project(projectList, Join(left, right, jt, cond)))
+// if (left.isInstanceOf[LeafNode] && => {
+// val fTables: Set[String] = tableCluster.getFact
+// val dTables: Set[String] = tableCluster.getDimension
+// // if canPushThrough(left,a)
+//
+// if (fTables.contains(s"${left.databaseName}.${left.tableName}")
+// Aggregate(newGrouping, newAggregate, Project(projectList, Join(Aggregate(_,_,Project
+// (projectList1, left)), right, jt, cond)))
+// }
+// }
+//
+// private def canPushThrough(join: Join): Boolean = join match {
+// case Join(left : LeafNode, right: LeafNode, Inner, EqualTo(l: AttributeReference,
+// r: AttributeReference)) => true
+//
+//
+// }
+//
+//
+// }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
new file mode 100644
index 0000000..de65e37
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -0,0 +1,355 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference,
+ AttributeSet, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.modular.JoinEdge
+
+/**
+ * SelectModule is extracted from logical plan of SPJG query. All join conditions
+ * filter, and project operators in a single Aggregate-less subtree of logical plan
+ * are collected.
+ *
+ * The returned values for this match are as follows:
+ * - Conditions for equi-join
+ * - Conditions for filter
+ * - Project list for project
+ *
+ */
+object ExtractSelectModule extends PredicateHelper {
+ type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String],
+ Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ val (outputs, inputs, predicates, joinedges, children, isSelect, _, flags, fspecs, wspecs) =
+ collectProjectsFiltersJoinsAndSort(plan)
+ if (!isSelect) {
+ None
+ } else {
+ Some(
+ outputs,
+ inputs,
+ predicates,
+ collectChildAliasMappings(
+ AttributeSet(outputs).toSeq ++ AttributeSet(predicates).toSeq,
+ children),
+ joinedges,
+ children,
+ flags,
+ fspecs,
+ wspecs)
+ }
+ }
+
+ def collectProjectsFiltersJoinsAndSort(plan: LogicalPlan): (Seq[NamedExpression],
+ Seq[Expression], Seq[Expression], Seq[JoinEdge], Seq[LogicalPlan], Boolean, Map[Attribute,
+ Expression], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]]) = {
+ plan match {
+ case Project(fields, child) =>
+ val (_, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs) =
+ collectProjectsFiltersJoinsAndSort(child)
+ val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+ (substitutedFields, inputs, predicates, joinedges, children, true, collectAliases(
+ substitutedFields), flags, fspecs, wspecs)
+
+ case Filter(condition, child) =>
+ val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs)
+ = collectProjectsFiltersJoinsAndSort(child)
+ val substitutedCondition = substitute(aliases)(condition)
+ (outputs, inputs, predicates.flatMap(splitConjunctivePredicates) ++
+ splitConjunctivePredicates(substitutedCondition), joinedges, children,
+ true, aliases, flags, fspecs, wspecs)
+
+ case Sort(order, global, child) =>
+ val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs)
+ = collectProjectsFiltersJoinsAndSort(child)
+ val substitutedOrder = order.map(substitute(aliases))
+ (outputs, inputs, predicates, joinedges, children, true, aliases, if (global) {
+ flags.setFlag(SORT).setFlag(GLOBAL)
+ } else {
+ flags.setFlag(SORT)
+ }, Seq(Seq(order)) ++ fspecs, wspecs)
+
+ case Join(left, right, joinType, condition) =>
+ val (loutputs, linputs, lpredicates, ljoinedges, lchildren, _, laliases, lflags, lfspecs,
+ lwspecs) = collectProjectsFiltersJoinsAndSort(left)
+ val (routputs, rinputs, rpredicates, rjoinedges, rchildren, _, raliases, rflags, rfspecs,
+ rwspecs) = collectProjectsFiltersJoinsAndSort(right)
+ val (lcondition, rcondition, ccondition) = split(condition, lchildren, rchildren)
+ val joinEdge = collectJoinEdge(ccondition, lchildren, rchildren, joinType)
+ val adjustedJoinEdges = rjoinedges
+ .map(e => JoinEdge(e.left + lchildren.size, e.right + lchildren.size, e.joinType))
+ val output: Seq[Attribute] = {
+ joinType match {
+ case LeftSemi =>
+ left.output
+ case LeftOuter =>
+ left.output ++ right.output.map(_.withNullability(true))
+ case RightOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output
+ case FullOuter =>
+ left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+ case LeftAnti =>
+ left.output
+ case _ =>
+ left.output ++ right.output
+ }
+ }
+ if (lfspecs.isEmpty && rfspecs.isEmpty && lflags == NoFlags && rflags == NoFlags &&
+ lwspecs.isEmpty && rwspecs.isEmpty) {
+ (output, (linputs ++ rinputs), lpredicates.flatMap(splitConjunctivePredicates) ++
+ rpredicates.flatMap(splitConjunctivePredicates) ++
+ lcondition ++ rcondition ++ ccondition, ljoinedges ++
+ joinEdge ++
+ adjustedJoinEdges,
+ lchildren ++ rchildren, true, laliases ++ raliases, NoFlags, Seq.empty, Seq.empty)
+ } else {
+ throw new UnsupportedOperationException(
+ s"unsupported join: \n left child ${ left } " +
+ s"\n right child ${ right }")
+ }
+
+ case other =>
+ (other.output, other.output, Nil, Nil, Seq(other), false, Map.empty, NoFlags, Seq.empty, Seq
+ .empty)
+ }
+ }
+
+ def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = {
+ fields.collect {
+ case a@Alias(child, _) => a.toAttribute -> child
+ }.toMap
+ }
+
+ def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = {
+ expr.transform {
+ case a@Alias(ref: AttributeReference, name) =>
+ aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifier)).getOrElse(a)
+
+ case a: AttributeReference =>
+ aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
+ }
+ }
+
+ def collectChildAliasMappings(attributeSet: Seq[Attribute], children: Seq[LogicalPlan]
+ ): Map[Int, String] = {
+ val aq = attributeSet.filter(_.qualifier.nonEmpty)
+ children.zipWithIndex.flatMap {
+ case (child, i) =>
+ aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+ }.toMap
+ }
+
+ def split(condition: Option[Expression],
+ lchildren: Seq[LogicalPlan],
+ rchildren: Seq[LogicalPlan]): (Seq[Expression], Seq[Expression], Seq[Expression]) = {
+ val left = lchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+ val right = rchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+ val conditions = condition.map(splitConjunctivePredicates).getOrElse(Nil)
+ val (leftEvaluationCondition, rest) = conditions.partition(_.references subsetOf left)
+ val (rightEvaluationCondition, commonCondition) = rest.partition(_.references subsetOf right)
+ (leftEvaluationCondition, rightEvaluationCondition, commonCondition)
+ }
+
+ /*
+ * collectJoinEdge only valid when condition are common condition of above split, left and
+ * right children correspond
+ * to respective two children parameters of above split
+ *
+ */
+ def collectJoinEdge(condition: Seq[Expression],
+ lchildren: Seq[LogicalPlan],
+ rchildren: Seq[LogicalPlan],
+ joinType: JoinType): Seq[JoinEdge] = {
+ val common = condition.map(_.references).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+ val lIdxSeq = lchildren
+ .collect { case x if x.outputSet.intersect(common).nonEmpty => lchildren.indexOf(x) }
+ val rIdxSeq = rchildren
+ .collect { case x if x.outputSet.intersect(common).nonEmpty => rchildren.indexOf(x) +
+ lchildren.size
+ }
+ for (l <- lIdxSeq; r <- rIdxSeq) yield {
+ JoinEdge(l, r, joinType)
+ }
+ }
+
+}
+
+object ExtractSelectModuleForWindow extends PredicateHelper {
+ type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String],
+ Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ collectSelectFromWindowChild(plan)
+ }
+
+ def collectSelectFromWindowChild(plan: LogicalPlan): Option[(Seq[NamedExpression],
+ Seq[Expression], Seq[Expression], Map[Int, String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet,
+ Seq[Seq[Any]], Seq[Seq[Any]])] = {
+ plan match {
+ case agg@Aggregate(_, _, _) =>
+ Some(
+ agg.aggregateExpressions,
+ agg.child.output,
+ Seq.empty,
+ Map.empty,
+ Seq.empty,
+ Seq(agg),
+ NoFlags,
+ Seq.empty,
+ Seq.empty)
+ case ExtractSelectModule(
+ output,
+ input,
+ predicate,
+ aliasmap,
+ joinedge,
+ children,
+ flags,
+ fspec,
+ wspec) =>
+ Some(output, input, predicate, aliasmap, joinedge, children, flags, fspec, wspec)
+ case Window(exprs, _, _, child) =>
+ val ret: Option[(Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int,
+ String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])] =
+ collectSelectFromWindowChild(
+ child)
+ ret.map(r => (r._1, r._2, r._3, r._4, r._5, r._6, r._7, r._8, Seq(Seq(exprs)) ++ r._9))
+ case other => None
+ }
+ }
+}
+
+/**
+ * GroupByModule is extracted from the Aggregate node of logical plan.
+ * The groupingExpressions, aggregateExpressions are collected.
+ *
+ * The returned values for this match are as follows:
+ * - Grouping attributes for the Aggregate node.
+ * - Aggregates for the Aggregate node.
+ * - Project list for project
+ *
+ */
+
+object ExtractGroupByModule extends PredicateHelper {
+ type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Option[String],
+ LogicalPlan, FlagSet, Seq[Seq[Any]])
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ plan match {
+ case a@logical.Aggregate(_, _, e@Expand(_, _, p: Project)) if isGroupingSet(a, e, p) =>
+ // Assumption: Aggregate's groupingExpressions is composed of
+ // 1) the grouping attributes
+ // 2) gid, which is always the last one
+ val g = a.groupingExpressions.map(_.asInstanceOf[Attribute])
+ val numOriginalOutput = e.output.size - g.size
+ Some(
+ a.aggregateExpressions,
+ e.output,
+ a.groupingExpressions,
+ None,
+ p,
+ NoFlags.setFlag(EXPAND),
+ Seq(Seq(e.projections, e.output, numOriginalOutput)))
+ case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>
+ Some(
+ aggregateExpressions,
+ child.output,
+ groupingExpressions,
+ None,
+ child,
+ NoFlags,
+ Seq.empty)
+ case other => None
+ }
+ }
+
+ private def isGroupingSet(a: Aggregate, e: Expand, p: Project): Boolean = {
+ assert(a.child == e && e.child == p)
+
+ if (a.groupingExpressions.forall(_.isInstanceOf[Attribute])) {
+ val g = a.groupingExpressions.map(_.asInstanceOf[Attribute])
+ sameOutput(
+ e.output.drop(e.output.size - g.size),
+ a.groupingExpressions.map(_.asInstanceOf[Attribute]))
+ } else {
+ false
+ }
+ }
+
+ private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = {
+ output1.size == output2.size &&
+ output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
+ }
+}
+
+object ExtractUnionModule extends PredicateHelper {
+ type ReturnType = (Seq[LogicalPlan], FlagSet, Seq[Seq[Any]])
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ plan match {
+ case u: Union =>
+ val children = collectUnionChildren(u)
+ Some(children, NoFlags, Seq.empty)
+ case _ => None
+ }
+ }
+
+ private def collectUnionChildren(plan: LogicalPlan): List[LogicalPlan] = {
+ plan match {
+ case Union(children) => children.toList match {
+ case head :: Nil => collectUnionChildren(head)
+ case head :: tail => collectUnionChildren(head) ++ collectUnionChildren(Union(tail))
+ case Nil => Nil
+ }
+ case other => other :: Nil
+ }
+ }
+}
+
+object ExtractTableModule extends PredicateHelper {
+ type ReturnType = (String, String, Seq[NamedExpression], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]])
+
+ def unapply(plan: LogicalPlan): Option[ReturnType] = {
+ plan match {
+ // uncomment for cloudera1 version
+// case m: CatalogRelation =>
+// Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+// Seq.empty)
+// uncomment for apache version
+ case m: HiveTableRelation =>
+ Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+ Seq.empty)
+ case l: LogicalRelation =>
+ val tableIdentifier = l.catalogTable.map(_.identifier)
+ val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
+ val table = tableIdentifier.map(_.table).getOrElse(null)
+ Some(database, table, l.output, Nil, NoFlags, Seq.empty)
+ case l: LocalRelation => // used for unit test
+ Some(null, null, l.output, Nil, NoFlags, Seq.empty)
+ case _ => None
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
new file mode 100644
index 0000000..0c5661e
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.mv.plans._
+
+object CheckSPJG {
+
+ def isSPJG(subplan: LogicalPlan): Boolean = {
+ subplan match {
+ case a: Aggregate =>
+ a.child.collect {
+ case Join(_, _, _, _) | Project(_, _) | Filter(_, _) |
+// CatalogRelation(_, _, _) |
+ LogicalRelation(_, _, _) | LocalRelation(_, _) => true
+ case _ => false
+ }.forall(identity)
+ case _ => false
+ }
+ }
+}
+
+object LogicalPlanSignatureGenerator extends SignatureGenerator[LogicalPlan] {
+ lazy val rule: SignatureRule[LogicalPlan] = LogicalPlanRule
+
+ override def generate(plan: LogicalPlan): Option[Signature] = {
+ if ( plan.isSPJG ) {
+ super.generate(plan)
+ } else {
+ None
+ }
+ }
+}
+
+object LogicalPlanRule extends SignatureRule[LogicalPlan] {
+
+ def apply(plan: LogicalPlan, childSignatures: Seq[Option[Signature]]): Option[Signature] = {
+
+ plan match {
+ case LogicalRelation(_, _, _) =>
+ // TODO: implement this (link to BaseRelation)
+ None
+// case CatalogRelation(tableMeta, _, _) =>
+// Some(Signature(false,
+// Set(Seq(tableMeta.database, tableMeta.identifier.table).mkString("."))))
+ case l: LocalRelation =>
+ // LocalRelation is for unit test cases
+ Some(Signature(groupby = false, Set(l.toString())))
+ case Filter(_, _) =>
+ if (childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby) {
+ // if (!childSignatures(0).getOrElse(Signature()).groupby) {
+ childSignatures(0)
+ // }
+ } else {
+ None
+ }
+ case Project(_, _) =>
+ if ( childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby ) {
+ childSignatures(0)
+ } else {
+ None
+ }
+ case Join(_, _, _, _) =>
+ if ( childSignatures.length == 2 &&
+ !childSignatures(0).getOrElse(Signature()).groupby &&
+ !childSignatures(1).getOrElse(Signature()).groupby ) {
+ Some(Signature(false,
+ childSignatures(0).getOrElse(Signature()).datasets
+ .union(childSignatures(1).getOrElse(Signature()).datasets)))
+ } else {
+ None
+ }
+ case Aggregate(_, _, _) =>
+ if ( childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby ) {
+ Some(Signature(true, childSignatures(0).getOrElse(Signature()).datasets))
+ } else {
+ None
+ }
+ case _ => None
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
new file mode 100644
index 0000000..b7641d5
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.io.{OutputStream, PrintWriter, StringWriter}
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, _}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuildDSL._
+
+// scalastyle:off println
+trait Printers {
+
+ abstract class FragmentPrinter(out: PrintWriter) {
+ protected var indentMargin = 0
+ protected val indentStep = 2
+ protected var indentString = " " // 40
+
+ def indent(): Unit = indentMargin += indentStep
+
+ def undent(): Unit = indentMargin -= indentStep
+
+ def println(): Unit = {
+ out.println()
+ while (indentMargin > indentString.length()) {
+ indentString += indentString
+ }
+ if (indentMargin > 0) {
+ out.write(indentString, 0, indentMargin)
+ }
+ }
+
+ def printSeq[a](ls: List[a])(printelem: a => Unit)(printsep: => Unit) {
+ ls match {
+ case List() =>
+ case List(x) => printelem(x)
+ case x :: rest => printelem(x); printsep; printSeq(rest)(printelem)(printsep)
+ }
+ }
+
+ def printColumn(ts: List[Fragment], start: String, sep: String, end: String) {
+ print(start)
+ indent
+ println()
+ printSeq(ts) { print(_) } { print(sep); println() }
+ undent
+ println()
+ print(end)
+ }
+
+ def printRow(ts: List[Fragment], start: String, sep: String, end: String) {
+ print(start)
+ printSeq(ts) { print(_) } { print(sep) }
+ print(end)
+ }
+
+ def printRow(ts: List[Fragment], sep: String) { printRow(ts, "", sep, "") }
+
+ def printFragment(frag: Fragment): Unit
+
+ def print(args: Any*): Unit = {
+ args foreach {
+ case frag: Fragment =>
+ printFragment(frag)
+ case arg =>
+ out.print(if (arg == null) {
+ "null"
+ } else {
+ arg.toString
+ })
+ }
+ }
+ }
+
+ /**
+ * A sql fragment printer which is stingy about vertical whitespace and unnecessary
+ * punctuation.
+ */
+ class SQLFragmentCompactPrinter(out: PrintWriter) extends FragmentPrinter(out) {
+
+ object Position extends Enumeration {
+ type Position = Value
+ val Select, From, GroupBy, Where, Sort, Limit = Value
+ }
+
+ import Position._
+
+ def printSelect(select: Seq[NamedExpression], flags: FlagSet): Unit = {
+ if (flags.hasFlag(DISTINCT)) {
+ print("SELECT DISTINCT %s "
+ .format(select.map(_.sql) mkString ", "))
+ } else {
+ print("SELECT %s ".format(select.map(_.sql) mkString ", "))
+ }
+ }
+
+ def printFromElem(f: (SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])): Unit = {
+ f._2 map { t => print("%s JOIN ".format(t.sql)) }
+ printFragment(f._1)
+ if (f._3.nonEmpty) {
+ print(" ON %s".format(f._3.map(_.sql).mkString(" AND ")))
+ }
+ }
+
+ def printFrom(
+ from: Seq[(SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])]): Seq[Unit] = {
+ def fromIndented(
+ x: (SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])): Unit = {
+ indent
+ println()
+ printFromElem(x)
+ undent
+ }
+
+ print("FROM")
+ from map fromIndented
+ }
+
+ def printWhere(where: Seq[Expression]): Unit = {
+ if (where != Nil) {
+ print("WHERE")
+ // if (where.nonEmpty) {
+ // val condition = where(0)
+ // val dt = condition.dataType
+ // val t = condition.sql
+ // print(t)
+ // }
+ indent
+ println()
+ print("%s".format(where.map(_.sql).mkString(" AND ")))
+ undent
+ }
+ }
+
+ def printGroupby(groupby: (Seq[Expression], Seq[Seq[Expression]])): Unit = {
+
+ if (groupby._1.nonEmpty) {
+ print("GROUP BY %s".format(groupby._1.map(_.sql).mkString(", ")))
+ if (groupby._2.nonEmpty) {
+ print(" GROUPING SETS(%s)"
+ .format(groupby._2.map(e => s"(${ e.map(_.sql).mkString(", ") })").mkString(", ")))
+ }
+ }
+ }
+
+ def printHaving(having: Seq[Expression]): Unit = {
+ if (having != Nil) {
+ print("HAVING %s".format(having.map(_.sql).mkString(" AND ")))
+ }
+ }
+
+ def printUnion(sqls: Seq[Fragment]): Unit = {
+ sqls match {
+ case sql1 :: sql2 :: rest =>
+ printFragment(sql1)
+ println()
+ print("UNION ALL")
+ println()
+ printUnion(sql2 :: rest)
+ case sql :: Nil => printFragment(sql)
+ }
+ }
+
+ def printTable(name: Seq[String]): Unit = {
+ print("%s".format(name.last))
+ }
+
+ trait ExprSeq extends Seq[SortOrder]
+
+ def printExprModifiers(modifiers: (FlagSet, Seq[Seq[Any]]), pos: Position): Unit = {
+ val flagsNeedExprs = for {flag <- pickledListOrder
+ if (modifiers._1.hasFlag(flag))} yield {
+ (flag)
+ }
+
+ flagsNeedExprs.zip(modifiers._2).foreach {
+ case (SORT, Seq(order)) =>
+ if (pos == Sort) { // TODO: how to verify type of order: && order
+ // .isInstanceOf[Seq[SortOrder]]) {
+ println()
+ if (modifiers._1.hasFlag(GLOBAL)) {
+ print("ORDER BY %s"
+ .format(order.asInstanceOf[Seq[SortOrder]].map {
+ // walk around for a sparkSQL bug
+ s => {
+ s.child match {
+ case a: Alias =>
+ val qualifierPrefix = a.qualifier
+ .map(_ + ".").getOrElse("")
+ s"$qualifierPrefix${
+ quoteIdentifier(a
+ .name)
+ }"
+
+ case other => other.sql
+ }
+ } + " " + s.direction.sql + " " + s.nullOrdering.sql
+ }.mkString(", ")))
+ } else {
+ print("SORT BY %s".format(order.asInstanceOf[Seq[SortOrder]].map {
+ // walk around for a sparkSQL bug
+ s => {
+ s.child match {
+ case a: Alias =>
+ val qualifierPrefix = a.qualifier.map(_ + ".")
+ .getOrElse("")
+ s"$qualifierPrefix${ quoteIdentifier(a.name) }"
+
+ case other => other.sql
+ }
+ } + " " + s.direction.sql + " " + s.nullOrdering.sql
+ }.mkString(", ")))
+ }
+ }
+ case (LIMIT, Seq(limitExpr)) =>
+ if (pos == Limit && limitExpr.isInstanceOf[Expression]) {
+ println()
+ print(s"LIMIT ${ limitExpr.asInstanceOf[Expression].sql }")
+ }
+ case (EXPAND, Seq(_, _)) =>
+ case _ =>
+ throw new UnsupportedOperationException(s"unsupported modifiers: ${
+ flagToString(modifiers
+ ._1)
+ }")
+ }
+ }
+
+ override def printFragment(frag: Fragment): Unit = {
+ require(
+ frag.Supported,
+ "Fragment is not supported. Current frag:\n" + frag)
+
+ frag match {
+ case spjg@SPJGFragment(select, from, where, groupby, having, alias, modifiers) =>
+ if (alias.nonEmpty) {
+ print("(")
+ }
+ printSelect(select, modifiers._1)
+ if (from.nonEmpty) {
+ println()
+ }
+ printFrom(from)
+ if (where.nonEmpty) {
+ println()
+ }
+ printWhere(where)
+ if (groupby._1.nonEmpty) {
+ println()
+ }
+ printGroupby(groupby)
+ if (having.nonEmpty) {
+ println()
+ }
+ printHaving(having)
+ if (modifiers._1.hasFlag(SORT)) {
+ printExprModifiers(modifiers, Sort)
+ }
+ if (modifiers._1.hasFlag(LIMIT)) {
+ printExprModifiers(modifiers, Limit)
+ }
+ alias.map { case a: String => print(") %s ".format(a)) }
+ case union@UNIONFragment(sqls, alias, modifiers) =>
+ if (alias.nonEmpty) {
+ print("(")
+ }
+ printUnion(sqls)
+ if (modifiers._1.hasFlag(LIMIT)) {
+ printExprModifiers(modifiers, Limit)
+ }
+ alias.map { case a: String => print(") %s ".format(a)) }
+ case table@TABLEFragment(name, alias, modifiers) =>
+ printTable(name)
+ alias.map { case a: String =>
+ if (!(a == name.last)) {
+ print(" %s ".format(a))
+ }
+ }
+ case _ =>
+ throw new UnsupportedOperationException(s"unsupported plan $frag")
+ }
+ }
+ }
+
+ /**
+ * A sql fragment printer which is one single line for a fragment.
+ */
+ class SQLFragmentOneLinePrinter(out: PrintWriter) extends SQLFragmentCompactPrinter(out) {
+ protected val sep = " "
+
+ override def println() { print(sep) }
+ }
+
+ /** @group Printers */
+ protected def render(what: Any, mkPrinter: PrintWriter => FragmentPrinter): String = {
+ val buffer = new StringWriter()
+ val writer = new PrintWriter(buffer)
+ var printer = mkPrinter(writer)
+
+ printer.print(what)
+ writer.flush()
+ buffer.toString
+ }
+
+ def asCompactString(t: Fragment): String = render(t, newSQLFragmentCompactPrinter)
+
+ def asOneLineString(t: Fragment): String = render(t, newSQLFragmentOneLinePrinter)
+
+ def newSQLFragmentCompactPrinter(writer: PrintWriter): SQLFragmentCompactPrinter = {
+ new SQLFragmentCompactPrinter(
+ writer)
+ }
+
+ def newSQLFragmentCompactPrinter(stream: OutputStream): SQLFragmentCompactPrinter = {
+ newSQLFragmentCompactPrinter(
+ new PrintWriter(stream))
+ }
+
+ def newSQLFragmentOneLinePrinter(writer: PrintWriter): SQLFragmentOneLinePrinter = {
+ new SQLFragmentOneLinePrinter(
+ writer)
+ }
+
+ def newSQLFragmentOneLinePrinter(stream: OutputStream): SQLFragmentOneLinePrinter = {
+ newSQLFragmentOneLinePrinter(
+ new PrintWriter(stream))
+ }
+}
+// scalastyle:on println
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
new file mode 100644
index 0000000..8c52d28
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.util.concurrent.atomic.AtomicLong
+
+class SQLBuild private (
+ nextSubqueryId: AtomicLong,
+ subqueryPrefix: String) {
+ self =>
+
+ def this(subqueryPrefix: String) =
+ this(new AtomicLong(0), subqueryPrefix)
+
+ def newSubqueryName(): String = s"${subqueryPrefix}${nextSubqueryId.getAndIncrement()}"
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
new file mode 100644
index 0000000..307fff0
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.types.{ByteType, IntegerType}
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+trait SQLBuildDSL {
+
+ abstract class Fragment extends Product {
+ // this class corresponding to TreeNode of SparkSQL or Tree of Scala compiler
+ def canEqual(that: Any): Boolean = {
+ throw new UnsupportedOperationException
+ }
+
+ def productArity: Int = throw new UnsupportedOperationException
+
+ def productElement(n: Int): Any = throw new UnsupportedOperationException
+
+ // TODO: add library needed for SQL building
+ def isUnsupported: Boolean
+
+ def Supported: Boolean
+
+ // def pos: Position
+ def alias: Option[String]
+ }
+
+ case class SPJGFragment(
+ select: Seq[NamedExpression] = Nil, // TODO: more general, change to Seq[Either[Fragment,
+ // NamedExpression]]
+ from: Seq[(Fragment, Option[JoinType], Seq[Expression])] = Nil,
+ where: Seq[Expression] = Nil,
+ groupby: (Seq[Expression], Seq[Seq[Expression]]) = (Nil, Nil),
+ having: Seq[Expression] = Nil,
+ alias: Option[String] = None,
+ modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+ override def isUnsupported: Boolean = {
+ select == Nil || from == Nil ||
+ from.map(_._1.isUnsupported).foldLeft(false)(_ || _)
+ }
+
+ override def Supported: Boolean = !isUnsupported
+ }
+
+ // TODO: find a scheme to break up fragmentExtract() using unapply
+ //
+ // object SPJGFragment {
+ // type ReturnType = (Seq[NamedExpression],Seq[(Fragment, Option[JoinType],Seq[Expression])
+ // ],Seq[Expression],Seq[Expression],Seq[Expression],Option[String])
+ //
+ // def unapply(plan: ModularPlan): Option[ReturnType] = fragmentExtract(plan, None)
+ // }
+
+ case class TABLEFragment(
+ table: Seq[String] = Nil,
+ alias: Option[String] = None,
+ modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+ override def isUnsupported: Boolean = table == Nil
+
+ override def Supported: Boolean = !isUnsupported
+ }
+
+ case class UNIONFragment(
+ union: Seq[Fragment] = Nil,
+ alias: Option[String] = None,
+ modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+ override def isUnsupported: Boolean = {
+ union == Nil ||
+ union.map(_.isUnsupported).foldLeft(false)(_ || _)
+ }
+
+ override def Supported: Boolean = !isUnsupported
+ }
+
+ val UnsupportedFragment: Fragment = new Fragment {
+ def isUnsupported = true
+
+ def Supported = false
+
+ def alias = None
+ }
+
+ /**
+ * Turns a bunch of string segments into a single string and separate each segment by a space.
+ * The segments are trimmed so only a single space appears in the separation.
+ * For example, `build("a", " b ", " c")` becomes "a b c".
+ */
+ protected def build(segments: String*): String = {
+ segments.map(_.trim).filter(_.nonEmpty).mkString(" ")
+ }
+
+ def fragmentExtract(plan: ModularPlan, alias: Option[String]): Fragment = {
+ if (plan.rewritten) {
+ fragmentExtract_Rewritten(plan, alias)
+ } else {
+ fragmentExtract_NonRewritten(plan, alias)
+ }
+ }
+
+ // Rewritten portion of query plan
+ private def fragmentExtract_Rewritten(
+ plan: ModularPlan,
+ alias: Option[String]): Fragment = {
+ plan match {
+ case s1@modular.Select(_, _, _, _, _,
+ Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if (!s1.skip && !g.skip && !s2.skip) =>
+ extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+ case s1@modular.Select(_, _, _, _, _,
+ Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if (s1.skip && g.skip && s2.skip) =>
+ extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+ case s1@modular.Select(_, _, _, _, _, Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if (!s1.skip && !g.skip && s2.skip) =>
+ extractRewrittenSelectGroupBy(s1, g, alias)
+
+ case s1@modular.Select(_, _, _, _, _, Seq(s2@modular.Select(_, _, _, _, _, _, _, _, _, _)),
+ _, _, _, _) if (!s1.skip && s2.skip) =>
+ extractRewrittenSelect(s1, alias)
+
+ case other => extractSimpleOperator(other, alias)
+ }
+ }
+
+ // Non-rewritten portion of query plan
+ private def fragmentExtract_NonRewritten(
+ plan: ModularPlan,
+ alias: Option[String]): Fragment = {
+ plan match {
+ case s1@modular.Select(_, _, _, _, _,
+ Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if (s1.aliasMap.isEmpty && !g.rewritten) =>
+ extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+ case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
+ if (g.alias.isEmpty && !s2.rewritten) =>
+ val fragmentList = s2.children.zipWithIndex
+ .map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
+ val fList = s2.joinEdges.map {
+ e => {
+ (e.right, (fragmentList(e.right), Some(e.joinType), s2
+ .extractRightEvaluableConditions(s2.children(e.left), s2.children(e.right))))
+ }
+ }.toMap
+ val from = (0 to fragmentList.length - 1)
+ .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+ val excludesPredicate = from.flatMap(_._3).toSet
+ val (select, (groupByExprs, groupingSet)) = addGroupingSetIfNeeded(g, s2)
+ SPJGFragment(
+ select,
+ from,
+ s2.predicateList.filter { p => !excludesPredicate(p) },
+ (groupByExprs, groupingSet),
+ Nil,
+ alias,
+ (g.flags, g.flagSpec))
+
+ case g@modular.GroupBy(_, _, _, _, _, _, _, _) if (g.alias.nonEmpty) =>
+ val from = Seq((fragmentExtract(g.child, g.alias), None, Nil))
+ SPJGFragment(
+ g.outputList,
+ from,
+ Nil,
+ (g.predicateList, Seq.empty),
+ Nil,
+ alias,
+ (g.flags, g.flagSpec))
+
+ case other => extractSimpleOperator(other, alias)
+ }
+ }
+
+ // used in both rewritten and non-rewritten cases
+ // currently in rewritten cases we don't consider grouping set
+ private def extractRewrittenOrNonRewrittenSelectGroupBySelect(
+ s1: modular.Select,
+ g: modular.GroupBy,
+ s2: modular.Select,
+ alias: Option[String]): Fragment = {
+ val fragmentList = s2.children.zipWithIndex
+ .map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
+ val fList = s2.joinEdges.map {
+ e => {
+ (e.right, (fragmentList(e.right), Some(e.joinType), s2
+ .extractRightEvaluableConditions(s2.children(e.left), s2.children(e.right))))
+ }
+ }.toMap
+ val from = (0 to fragmentList.length - 1)
+ .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+ val excludesPredicate = from.flatMap(_._3).toSet
+ val aliasMap = AttributeMap(g.outputList.collect { case a: Alias => (a.toAttribute, a) })
+ val windowExprs = s1.windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ val having = s1.predicateList
+ .map { case attr: Attribute => aliasMap.get(attr).map(_.child)
+ .getOrElse(attr);
+ case expr: Expression => expr.transform { case a: Alias => a.child };
+ case other => other
+ }
+
+ val (select_g, (groupByExprs, groupingSet)) = addGroupingSetIfNeeded(g, s2)
+
+ val gSet = AttributeSet(g.outputList.map(_.toAttribute))
+ val sSet = AttributeSet(s1.outputList.map(_.toAttribute))
+ val select = if (groupingSet.nonEmpty) {
+ if (gSet.equals(sSet) && windowExprs.isEmpty) {
+ select_g
+ } else {
+ throw new UnsupportedOperationException
+ }
+ } else
+ // TODO: how to handle alias of attribute in MV
+ {
+ s1.outputList.map { attr => aliasMap.get(attr.toAttribute).getOrElse(attr) } ++ windowExprs
+ }
+
+ SPJGFragment(
+ select, // select
+ from, // from
+ s2.predicateList.filter { p => !excludesPredicate(p) }, // where
+ (groupByExprs, groupingSet), // group by
+ having, // having
+ alias,
+ (s1.flags, s1.flagSpec))
+ }
+
+ // used in rewritten cases only -- don't consider grouping set
+ private def extractRewrittenSelectGroupBy(
+ s1: modular.Select,
+ g: modular.GroupBy,
+ alias: Option[String]): Fragment = {
+ val fragment = fragmentExtract(g.child, g.alias)
+ val from = Seq((fragment, None, Nil))
+ val aliasMap = AttributeMap(g.outputList.collect { case a: Alias => (a.toAttribute, a) })
+ val windowExprs = s1.windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ val having = s1.predicateList
+ .map { case attr: Attribute => aliasMap.get(attr).map(_.child)
+ .getOrElse(attr);
+ case expr: Expression => expr.transform { case a: Alias => a.child };
+ case other => other
+ }
+
+ val (select_g, (groupByExprs, groupingSet)) = (g.outputList, (g.predicateList, Seq.empty))
+
+ // TODO: how to handle alias of attribute in MV
+ val select = s1.outputList.map { attr => aliasMap.get(attr.toAttribute).getOrElse(attr) } ++
+ windowExprs
+ SPJGFragment(
+ select, // select
+ from, // from
+ Nil, // where
+ (groupByExprs, groupingSet), // group by
+ having, // having
+ alias,
+ (s1.flags, s1.flagSpec))
+ }
+
+ private def extractRewrittenSelect(s1: modular.Select, alias: Option[String]): Fragment = {
+ val fragment = fragmentExtract(s1.children(0), s1.aliasMap.get(0))
+ val from = Seq((fragment, None, Nil))
+ val windowExprs = s1.windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ val select = s1.outputList.map(_.toAttribute)
+
+ SPJGFragment(
+ select, // select
+ from, // from
+ s1.predicateList, // where
+ (Nil, Nil), // group by
+ Nil, // having
+ alias,
+ (s1.flags, s1.flagSpec))
+ }
+
+ private def extractSimpleOperator(
+ operator: ModularPlan,
+ alias: Option[String]): Fragment = {
+ operator match {
+ case s@modular.Select(_, _, _, _, _, _, _, _, _, _) =>
+ val fragmentList = s.children.zipWithIndex
+ .map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }
+ val fList = s.joinEdges.map {
+ e => {
+ (e.right, (fragmentList(e.right), Some(e.joinType), s
+ .extractRightEvaluableConditions(s.children(e.left), s.children(e.right))))
+ }
+ }.toMap
+ val from = (0 to fragmentList.length - 1)
+ .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+ val excludesPredicate = from.flatMap(_._3).toSet
+ val windowExprs = s.windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ val select = s.outputList ++ windowExprs
+
+ SPJGFragment(
+ select, // select
+ from, // from
+ s.predicateList.filter { p => !excludesPredicate(p) }, // where
+ (Nil, Nil), // group by
+ Nil, // having
+ alias,
+ (s.flags, s.flagSpec))
+
+ case u@modular.Union(_, _, _) =>
+ UNIONFragment(
+ u.children.zipWithIndex.map { case (child, index) => fragmentExtract(child, None) },
+ alias,
+ (u.flags, u.flagSpec))
+
+ case d@modular.ModularRelation(_, _, _, _, _) =>
+ if (d.databaseName != null && d.tableName != null) {
+ TABLEFragment(
+ Seq(d.databaseName, d.tableName), alias, (d.flags, d.rest))
+ } else {
+ TABLEFragment(Seq((d.output).toString()), alias, (d.flags, d.rest))
+ }
+
+ case h@modular.HarmonizedRelation(_) =>
+ fragmentExtract(h.source, alias)
+
+ case _ => UnsupportedFragment
+ }
+ }
+
+ private def addGroupingSetIfNeeded(g: modular.GroupBy, s: modular.Select) = {
+ if (g.flags.hasFlag(EXPAND)) {
+ assert(g.predicateList.length > 1)
+ val flagsNeedExprs =
+ for {flag <- pickledListOrder if (g.flags.hasFlag(flag))} yield {
+ flag
+ }
+ flagsNeedExprs.zip(g.flagSpec).collect {
+ case (EXPAND, Seq(projections_, output_, numOriginalOutput_)) =>
+ val output = output_.asInstanceOf[Seq[Attribute]]
+ val projections = projections_.asInstanceOf[Seq[Seq[Expression]]]
+ val numOriginalOutput = numOriginalOutput_.asInstanceOf[Int]
+
+ // The last column of Expand is always grouping ID
+ val gid = output.last
+
+ val groupByAttributes = g.predicateList.dropRight(1).map(_.asInstanceOf[Attribute])
+ // Assumption: project's projectList is composed of
+ // 1) the original output (Project's child.output),
+ // 2) the aliased group by expressions.
+ val expandedAttributes = s.output.drop(numOriginalOutput)
+ val groupByExprs = s.outputList.drop(numOriginalOutput).map(_.asInstanceOf[Alias].child)
+
+ // a map from group by attributes to the original group by expressions.
+ val groupByAttrMap = AttributeMap(groupByAttributes.zip(groupByExprs))
+ // a map from expanded attributes to the original group by expressions.
+ val expandedAttrMap = AttributeMap(expandedAttributes.zip(groupByExprs))
+
+ val groupingSet: Seq[Seq[Expression]] = projections.map { project =>
+ // Assumption: expand.projections is composed of
+ // 1) the original output (Project's child.output),
+ // 2) expanded attributes(or null literal)
+ // 3) gid, which is always the last one in each project in Expand
+ project.drop(numOriginalOutput).dropRight(1).collect {
+ case attr: Attribute if expandedAttrMap.contains(attr) => expandedAttrMap(attr)
+ }
+ }
+
+ val aggExprs = g.outputList.map { case aggExpr =>
+ val originalAggExpr = aggExpr.transformDown {
+ // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert
+ // it back.
+ case ar: AttributeReference if ar == gid => GroupingID(Nil)
+ case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
+ case a@Cast(
+ BitwiseAnd(
+ ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
+ Literal(1, IntegerType)), ByteType, None) if ar == gid =>
+ // for converting an expression to its original SQL format grouping(col)
+ val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
+ groupByExprs.lift(idx).map(Grouping).getOrElse(a)
+ }
+
+ originalAggExpr match {
+ // Ancestor operators may reference the output of this grouping set,
+ // and we use exprId to generate a unique name for each attribute, so we should
+ // make sure the transformed aggregate expression won't change the output,
+ // i.e. exprId and alias name should remain the same.
+ case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
+ case e => Alias(e, aggExpr.name)(exprId = aggExpr.exprId)
+ }
+ }
+ (aggExprs, (groupByExprs, groupingSet))
+
+ }.head
+ } else {
+ (g.outputList, (g.predicateList, Seq.empty))
+ }
+ }
+}
+
+object SQLBuildDSL extends SQLBuildDSL
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
new file mode 100644
index 0000000..4bc8b97
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.immutable
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.{Select, _}
+import org.apache.carbondata.mv.plans.util.SQLBuildDSL.Fragment
+
+// TODO: generalize this using SQLBuild to handle nested sub-query expression, using the
+// correspondence
+// SQLBuild <--> QueryRewrite
+class SQLBuilder private(
+ modularPlan: ModularPlan,
+ nextSubqueryId: AtomicLong,
+ subqueryPrefix: String) {
+
+ def this(modularPlan: ModularPlan) = {
+ this(
+ SQLBuilder.subqueryPreprocess(modularPlan, new AtomicLong()),
+ new AtomicLong(0),
+ "gen_subquery_")
+ }
+
+ // only allow one level scalar subquery
+ def this(modularPlan: ModularPlan, subqueryPrefix: String) = {
+ this(SQLBuilder.screenOutPlanWithSubquery(modularPlan), new AtomicLong(0), subqueryPrefix)
+ }
+
+ private def newSubqueryName(): String = {
+ s"${ subqueryPrefix }${
+ nextSubqueryId.getAndIncrement()
+ }"
+ }
+
+ def fragmentExtract: Fragment = {
+ val finalPlan: ModularPlan = SQLizer.execute(modularPlan)
+ SQLBuildDSL.fragmentExtract(finalPlan, None)
+ }
+
+ object SQLizer extends RuleExecutor[ModularPlan] {
+ override protected def batches: Seq[Batch] = {
+ Seq(
+ Batch(
+ "Recover Scoping Info", Once,
+ // Clean up qualifiers due to erasure of sub-query aliases to use base tables as
+ // qualifiers
+ // This complements AddSubquery to handle corner case qMV3 in Tpcds_1_4_Querybatch.scala
+ CleanupQualifier,
+ // Insert sub queries on top of operators that need to appear after FROM clause.
+ AddSubquery
+ )
+ )
+ }
+
+ object CleanupQualifier extends Rule[ModularPlan] {
+ override def apply(tree: ModularPlan): ModularPlan = {
+ tree transformDown {
+ case s@modular.Select(_, _, _, _, _,
+ Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if !s.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+ val attrMap = AttributeMap(s2.outputList
+ .collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
+ cleanupQualifier(s, s2.aliasMap, s2.children, attrMap)
+
+ case g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
+ if !g.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+ val attrMap = AttributeMap(s2.outputList
+ .collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
+ cleanupQualifier(g, s2.aliasMap, s2.children, attrMap)
+
+ case s@modular.Select(_, _, _, _, _, _, _, _, _, _)
+ if !s.rewritten && s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+ cleanupQualifier(s, s.aliasMap, s.children, AttributeMap(Seq.empty[(Attribute, Alias)]))
+
+ }
+ }
+
+ private def cleanupQualifier(node: ModularPlan,
+ aliasMap: Map[Int, String],
+ children: Seq[ModularPlan],
+ attrMap: AttributeMap[Alias]): ModularPlan = {
+ node transform {
+ case plan if !plan.rewritten && !plan.isInstanceOf[modular.LeafNode] =>
+ plan.transformExpressions {
+ case ref: Attribute =>
+ val i = children.indexWhere(_.outputSet.contains(ref))
+ if (i > -1) {
+ // this is a walk around for mystery of spark qualifier
+ if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
+ AttributeReference(
+ ref.name,
+ ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
+ } else {
+ ref
+ }
+ } else {
+ attrMap.get(ref) match {
+ case Some(alias) => Alias(alias.child, alias.name)(exprId = alias.exprId)
+ case None => ref
+ }
+ }
+ }
+ }
+ }
+ }
+
+ object AddSubquery extends Rule[ModularPlan] {
+ override def apply(tree: ModularPlan): ModularPlan = {
+ tree transformUp {
+ case s@modular.Select(_, _, _, _, _,
+ Seq(g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+ if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => s
+
+ case g@modular.GroupBy(_, _, _, _,
+ s2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
+ if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => g
+
+ case s@modular.Select(_, _, _, _, _, _, _, _, _, _)
+ if !s.rewritten && !s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+ var list: List[(Int, String)] = List()
+ var newS = s.copy()
+ s.children.zipWithIndex.filterNot { _._1.isInstanceOf[modular.LeafNode] }.foreach {
+ case (child: ModularPlan, index) if (!s.aliasMap.contains(index)) =>
+ val subqueryName = newSubqueryName()
+ val windowAttributeSet = if (child.isInstanceOf[Select]) {
+ val windowExprs = child.asInstanceOf[Select].windowSpec
+ .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+ .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+ SQLBuilder.collectAttributeSet(windowExprs)
+ } else {
+ AttributeSet.empty
+ }
+ val subqueryAttributeSet = child.outputSet ++ windowAttributeSet
+ // TODO: how to use alias to avoid duplicate names with distinct
+ // exprIds
+ // val duplicateNames = collectDuplicateNames(subqueryAttributeSet)
+ // .toSet
+ if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+ new UnsupportedOperationException(s"duplicate name(s): ${
+ child.output
+ .map(_.toString + ", ")
+ }")
+ }
+ list = list :+ ((index, subqueryName))
+ newS = newS.transformExpressions {
+ case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(subqueryName))
+ case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
+ Alias(alias.child, alias.name)(
+ exprId = alias.exprId,
+ qualifier = Some(subqueryName))
+ }
+
+ case _ =>
+ }
+ if (list.nonEmpty) {
+ list = list ++ s.aliasMap.toSeq
+ newS.copy(aliasMap = list.toMap)
+ } else {
+ newS
+ }
+
+ case g@modular.GroupBy(_, _, _, _, _, _, _, _) if (!g.rewritten && g.alias.isEmpty) =>
+ val newG = if (g.outputList.isEmpty) {
+ val ol = g.predicateList.map { case a: Attribute => a }
+ g.copy(outputList = ol)
+ } else {
+ g
+ }
+ val subqueryName = newSubqueryName()
+ val subqueryAttributeSet = newG.child.outputSet
+ if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+ new UnsupportedOperationException(s"duplicate name(s): ${
+ newG.child.output.map(_.toString + ", ")
+ }")
+ }
+ newG.transformExpressions {
+ case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
+ AttributeReference(ref.name, ref.dataType)(
+ exprId = ref.exprId,
+ qualifier = Some(subqueryName))
+ case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
+ Alias(alias.child, alias.name)(
+ exprId = alias.exprId,
+ qualifier = Some(subqueryName))
+ }.copy(alias = Some(subqueryName))
+ }
+ }
+ }
+ }
+
+}
+
+object SQLBuilder {
+ def collectAttributeSet(outputList: Seq[Expression]): AttributeSet = {
+ AttributeSet(outputList
+ .collect {
+ case a@Alias(_, _) => a.toAttribute
+ case a: Attribute => a
+ })
+ }
+
+ def collectDuplicateNames(s: AttributeSet): immutable.Iterable[String] = {
+ s.baseSet.map(_.a).groupBy(_.name).collect { case (x, ys) if ys.size > 1 => x }
+ }
+
+ def subqueryPreprocess(plan: ModularPlan, nextScalarSubqueryId: AtomicLong): ModularPlan = {
+ def newSubqueryName(nextScalarSubqueryId: AtomicLong): String = {
+ s"gen_expression_${
+ nextScalarSubqueryId
+ .getAndIncrement()
+ }_"
+ }
+
+ plan.transformAllExpressions {
+ case s: ModularSubquery =>
+ if (s.children.isEmpty) {
+ val subqueryName = newSubqueryName(nextScalarSubqueryId)
+ SubqueryHolder(s"(${ s.plan.asOneLineSQL(subqueryName) })")
+ } else {
+ throw new UnsupportedOperationException(s"Expression $s can't converted to SQL")
+ }
+ case o => o
+ }
+ }
+
+ def screenOutPlanWithSubquery(plan: ModularPlan): ModularPlan = {
+ plan.transformAllExpressions {
+ case s: ModularSubquery =>
+ throw new UnsupportedOperationException(s"Nested scala subquery $s doesn't supported")
+ case e => e
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
new file mode 100644
index 0000000..c46124b
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty)
+
+abstract class SignatureRule[BaseType <: TreeNode[BaseType]] extends Logging {
+ def apply(subplan: BaseType, signatures: Seq[Option[Signature]]): Option[Signature]
+}
+
+abstract class SignatureGenerator[BaseType <: TreeNode[BaseType]] extends Logging {
+ protected val rule: SignatureRule[BaseType]
+
+ def generate(subplan: BaseType): Option[Signature] = {
+ generateUp(subplan)
+ }
+
+ protected def generateChildren(subplan: BaseType, nextOperation: BaseType => Option[Signature]
+ ): Seq[Option[Signature]] = {
+ subplan.children.map { child =>
+ nextOperation(child.asInstanceOf[BaseType])
+ }
+ }
+
+ def generateUp(subplan: BaseType): Option[Signature] = {
+ val childSignatures = generateChildren(subplan, t => generateUp(t))
+ val lastSignature = rule(subplan, childSignatures)
+ lastSignature
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
new file mode 100644
index 0000000..021518c
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonRawValue}
+import com.google.common.base.Objects
+
+class TableCluster @JsonCreator()(@JsonProperty("fact") @JsonRawValue fact: Set[String],
+ @JsonProperty("dimension") @JsonRawValue dimension: Set[String]) {
+
+ // @JsonProperty
+ def getFact(): Set[String] = {
+ fact
+ }
+
+ //
+ // @JsonProperty
+ def getDimension(): Set[String] = {
+ dimension
+ }
+
+ @Override
+ override def toString: String = {
+ Objects.toStringHelper(this)
+ .add("fact", fact)
+ .add("dimension", dimension)
+ .toString
+ }
+
+ /*
+ @Override
+ def toString = {
+ MoreObjects.toStringHelper(this)
+ .add("fact", fact)
+ .add("dimension", dimension)
+ .toString
+ }
+ *
+ */
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
new file mode 100644
index 0000000..0c68a1f
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.testutil
+
+import java.io.File
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.util.{PlanTest, QueryTest}
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, OneRowTable, Select}
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+/**
+ * Provides helper methods for comparing plans.
+ */
+abstract class ModularPlanTest extends QueryTest with PredicateHelper {
+
+ /**
+ * Since attribute references are given globally unique ids during analysis,
+ * we must normalize them to check if two different queries are identical.
+ */
+ protected def normalizeExprIds(plan: ModularPlan): plan.type = {
+ plan transformAllExpressions {
+ case s: ScalarSubquery =>
+ s.copy(exprId = ExprId(0))
+ case e: Exists =>
+ e.copy(exprId = ExprId(0))
+ case l: ListQuery =>
+ l.copy(exprId = ExprId(0))
+ case a: AttributeReference =>
+ AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
+ case a: Alias =>
+ Alias(a.child, a.name)(exprId = ExprId(0))
+ case ae: AggregateExpression =>
+ ae.copy(resultId = ExprId(0))
+ }
+ }
+
+ /**
+ * Normalizes plans:
+ * - Filter the filter conditions that appear in a plan. For instance,
+ * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2)
+ * etc., will all now be equivalent.
+ * - Sample the seed will replaced by 0L.
+ * - Join conditions will be resorted by hashCode.
+ */
+ protected def normalizePlan(plan: LogicalPlan): LogicalPlan = {
+ plan transform {
+ case filter@Filter(condition: Expression, child: LogicalPlan) =>
+ Filter(
+ splitConjunctivePredicates(condition).map(rewriteEqual(_)).sortBy(_.hashCode())
+ .reduce(And), child)
+ case sample: Sample =>
+ sample.copy(seed = 0L)(true)
+ case join@Join(left, right, joinType, condition) if condition.isDefined =>
+ val newCondition =
+ splitConjunctivePredicates(condition.get).map(rewriteEqual(_)).sortBy(_.hashCode())
+ .reduce(And)
+ Join(left, right, joinType, Some(newCondition))
+ }
+ }
+
+ /**
+ * Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The following cases will be
+ * equivalent:
+ * 1. (a = b), (b = a);
+ * 2. (a <=> b), (b <=> a).
+ */
+ private def rewriteEqual(condition: Expression): Expression = {
+ condition match {
+ case eq@EqualTo(l: Expression, r: Expression) =>
+ Seq(l, r).sortBy(_.hashCode()).reduce(EqualTo)
+ case eq@EqualNullSafe(l: Expression, r: Expression) =>
+ Seq(l, r).sortBy(_.hashCode()).reduce(EqualNullSafe)
+ case _ => condition // Don't reorder.
+ }
+ }
+
+ //
+ // /** Fails the test if the two plans do not match */
+ // protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+ // val normalized1 = normalizePlan(normalizeExprIds(plan1))
+ // val normalized2 = normalizePlan(normalizeExprIds(plan2))
+ // if (normalized1 != normalized2) {
+ // fail(
+ // s"""
+ // |== FAIL: Plans do not match ===
+ // |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+ // """.stripMargin)
+ // }
+ // }
+ //
+ // /** Fails the test if the two expressions do not match */
+ // protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
+ // comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
+ // }
+ //
+ // /** Fails the test if the join order in the two plans do not match */
+ // protected def compareJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan) {
+ // val normalized1 = normalizePlan(normalizeExprIds(plan1))
+ // val normalized2 = normalizePlan(normalizeExprIds(plan2))
+ // if (!sameJoinPlan(normalized1, normalized2)) {
+ // fail(
+ // s"""
+ // |== FAIL: Plans do not match ===
+ // |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+ // """.stripMargin)
+ // }
+ // }
+ //
+ // /** Consider symmetry for joins when comparing plans. */
+ // private def sameJoinPlan(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+ // (plan1, plan2) match {
+ // case (j1: Join, j2: Join) =>
+ // (sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) ||
+ // (sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left))
+ // case (p1: Project, p2: Project) =>
+ // p1.projectList == p2.projectList && sameJoinPlan(p1.child, p2.child)
+ // case _ =>
+ // plan1 == plan2
+ // }
+ // }
+ /** Fails the test if the corresponding pairs of plans do not match */
+ protected def comparePlanCollections(planSet1: Seq[String], planSet2: Seq[String]) {
+ for ((plan1, plan2) <- planSet1 zip planSet2) {
+ compareMessages(plan1, plan2)
+ }
+ }
+
+ /** Fails the test if the two plans do not match */
+ /** Only expressionIds are normalized. This is enough for our test cases */
+ /** For more general normalization, see Spark PlanTest.scala for Logical Plan */
+ protected def comparePlans(plan1: ModularPlan, plan2: ModularPlan) {
+ val normalized1 = normalizeExprIds(plan1)
+ val normalized2 = normalizeExprIds(plan2)
+ if (normalized1 != normalized2) {
+ fail(
+ s"""
+ |== FAIL: Plans do not match ===
+ |${ sideBySide(normalized1.treeString, normalized1.treeString).mkString("\n") }
+ """.stripMargin)
+ }
+ }
+
+ /** Fails the test if the two expressions do not match */
+ protected def compareExpressions(e1: Seq[Expression], e2: Seq[Expression]): Unit = {
+ comparePlans(
+ Select(Nil, Nil, e1, Map.empty, Nil, Seq(OneRowTable), NoFlags, Seq.empty, Seq.empty), modular
+ .Select(Nil, Nil, e2, Map.empty, Nil, Seq(OneRowTable), NoFlags, Seq.empty, Seq.empty))
+ }
+
+ protected def compareMessages(msg1: String, msg2: String) {
+ if (msg1 != msg2) {
+ fail(
+ s"""
+ |== FAIL: Messages do not match ==
+ |${ sideBySide(msg1, msg2).mkString("\n") }
+ """.stripMargin)
+ }
+ }
+}