You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/09 08:44:25 UTC

[07/12] carbondata git commit: [CARBONDATA-2242] Add Materialized View modules

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
new file mode 100644
index 0000000..81d88da
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Logical2ModularExtractions.scala
@@ -0,0 +1,353 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference,
+  AttributeSet, Expression, NamedExpression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.modular.JoinEdge
+
+/**
+ * SelectModule is extracted from logical plan of SPJG query.  All join conditions
+ * filter, and project operators in a single Aggregate-less subtree of logical plan
+ * are collected.
+ *
+ * The returned values for this match are as follows:
+ *  - Conditions for equi-join
+ *  - Conditions for filter
+ *  - Project list for project
+ *
+ */
+object ExtractSelectModule extends PredicateHelper {
+  type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String],
+    Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    val (outputs, inputs, predicates, joinedges, children, isSelect, _, flags, fspecs, wspecs) =
+      collectProjectsFiltersJoinsAndSort(plan)
+    if (!isSelect) {
+      None
+    } else {
+      Some(
+        outputs,
+        inputs,
+        predicates,
+        collectChildAliasMappings(AttributeSet(outputs) ++ AttributeSet(predicates), children),
+        joinedges,
+        children,
+        flags,
+        fspecs,
+        wspecs)
+    }
+  }
+
+  def collectProjectsFiltersJoinsAndSort(plan: LogicalPlan): (Seq[NamedExpression],
+    Seq[Expression], Seq[Expression], Seq[JoinEdge], Seq[LogicalPlan], Boolean, Map[Attribute,
+    Expression], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]]) = {
+    plan match {
+      case Project(fields, child) =>
+        val (_, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs) =
+          collectProjectsFiltersJoinsAndSort(child)
+        val substitutedFields = fields.map(substitute(aliases)).asInstanceOf[Seq[NamedExpression]]
+        (substitutedFields, inputs, predicates, joinedges, children, true, collectAliases(
+          substitutedFields), flags, fspecs, wspecs)
+
+      case Filter(condition, child) =>
+        val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs)
+        = collectProjectsFiltersJoinsAndSort(child)
+        val substitutedCondition = substitute(aliases)(condition)
+        (outputs, inputs, predicates.flatMap(splitConjunctivePredicates) ++
+                          splitConjunctivePredicates(substitutedCondition), joinedges, children,
+          true, aliases, flags, fspecs, wspecs)
+
+      case Sort(order, global, child) =>
+        val (outputs, inputs, predicates, joinedges, children, _, aliases, flags, fspecs, wspecs)
+        = collectProjectsFiltersJoinsAndSort(child)
+        val substitutedOrder = order.map(substitute(aliases))
+        (outputs, inputs, predicates, joinedges, children, true, aliases, if (global) {
+          flags.setFlag(SORT).setFlag(GLOBAL)
+        } else {
+          flags.setFlag(SORT)
+        }, Seq(Seq(order)) ++ fspecs, wspecs)
+
+      case Join(left, right, joinType, condition) =>
+        val (loutputs, linputs, lpredicates, ljoinedges, lchildren, _, laliases, lflags, lfspecs,
+        lwspecs) = collectProjectsFiltersJoinsAndSort(left)
+        val (routputs, rinputs, rpredicates, rjoinedges, rchildren, _, raliases, rflags, rfspecs,
+        rwspecs) = collectProjectsFiltersJoinsAndSort(right)
+        val (lcondition, rcondition, ccondition) = split(condition, lchildren, rchildren)
+        val joinEdge = collectJoinEdge(ccondition, lchildren, rchildren, joinType)
+        val adjustedJoinEdges = rjoinedges
+          .map(e => JoinEdge(e.left + lchildren.size, e.right + lchildren.size, e.joinType))
+        val output: Seq[Attribute] = {
+          joinType match {
+            case LeftSemi =>
+              left.output
+            case LeftOuter =>
+              left.output ++ right.output.map(_.withNullability(true))
+            case RightOuter =>
+              left.output.map(_.withNullability(true)) ++ right.output
+            case FullOuter =>
+              left.output.map(_.withNullability(true)) ++ right.output.map(_.withNullability(true))
+            case LeftAnti =>
+              left.output
+            case _ =>
+              left.output ++ right.output
+          }
+        }
+        if (lfspecs.isEmpty && rfspecs.isEmpty && lflags == NoFlags && rflags == NoFlags &&
+            lwspecs.isEmpty && rwspecs.isEmpty) {
+          (output, (linputs ++ rinputs), lpredicates.flatMap(splitConjunctivePredicates) ++
+                                         rpredicates.flatMap(splitConjunctivePredicates) ++
+                                         lcondition ++ rcondition ++ ccondition, ljoinedges ++
+                                                                                 joinEdge ++
+                                                                                 adjustedJoinEdges,
+            lchildren ++ rchildren, true, laliases ++ raliases, NoFlags, Seq.empty, Seq.empty)
+        } else {
+          throw new UnsupportedOperationException(
+            s"unsupported join: \n left child ${ left } " +
+            s"\n right child ${ right }")
+        }
+
+      case other =>
+        (other.output, other.output, Nil, Nil, Seq(other), false, Map.empty, NoFlags, Seq.empty, Seq
+          .empty)
+    }
+  }
+
+  def collectAliases(fields: Seq[Expression]): Map[Attribute, Expression] = {
+    fields.collect {
+      case a@Alias(child, _) => a.toAttribute -> child
+    }.toMap
+  }
+
+  def substitute(aliases: Map[Attribute, Expression])(expr: Expression): Expression = {
+    expr.transform {
+      case a@Alias(ref: AttributeReference, name) =>
+        aliases.get(ref).map(Alias(_, name)(a.exprId, a.qualifier)).getOrElse(a)
+
+      case a: AttributeReference =>
+        aliases.get(a).map(Alias(_, a.name)(a.exprId, a.qualifier)).getOrElse(a)
+    }
+  }
+
+  def collectChildAliasMappings(attributeSet: AttributeSet, children: Seq[LogicalPlan]
+  ): Map[Int, String] = {
+    val aq = attributeSet.filter(_.qualifier.nonEmpty)
+    children.zipWithIndex.flatMap {
+      case (child, i) =>
+        aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+    }.toMap
+  }
+
+  def split(condition: Option[Expression],
+      lchildren: Seq[LogicalPlan],
+      rchildren: Seq[LogicalPlan]): (Seq[Expression], Seq[Expression], Seq[Expression]) = {
+    val left = lchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+    val right = rchildren.map(_.outputSet).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+    val conditions = condition.map(splitConjunctivePredicates).getOrElse(Nil)
+    val (leftEvaluationCondition, rest) = conditions.partition(_.references subsetOf left)
+    val (rightEvaluationCondition, commonCondition) = rest.partition(_.references subsetOf right)
+    (leftEvaluationCondition, rightEvaluationCondition, commonCondition)
+  }
+
+  /*
+   * collectJoinEdge only valid when condition are common condition of above split, left and
+   * right children correspond
+   * to respective two children parameters of above split
+   *
+   */
+  def collectJoinEdge(condition: Seq[Expression],
+      lchildren: Seq[LogicalPlan],
+      rchildren: Seq[LogicalPlan],
+      joinType: JoinType): Seq[JoinEdge] = {
+    val common = condition.map(_.references).foldLeft(AttributeSet(Set.empty))(_ ++ _)
+    val lIdxSeq = lchildren
+      .collect { case x if x.outputSet.intersect(common).nonEmpty => lchildren.indexOf(x) }
+    val rIdxSeq = rchildren
+      .collect { case x if x.outputSet.intersect(common).nonEmpty => rchildren.indexOf(x) +
+                                                                     lchildren.size
+      }
+    for (l <- lIdxSeq; r <- rIdxSeq) yield {
+      JoinEdge(l, r, joinType)
+    }
+  }
+
+}
+
+object ExtractSelectModuleForWindow extends PredicateHelper {
+  type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int, String],
+    Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    collectSelectFromWindowChild(plan)
+  }
+
+  def collectSelectFromWindowChild(plan: LogicalPlan): Option[(Seq[NamedExpression],
+    Seq[Expression], Seq[Expression], Map[Int, String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet,
+    Seq[Seq[Any]], Seq[Seq[Any]])] = {
+    plan match {
+      case agg@Aggregate(_, _, _) =>
+        Some(
+          agg.aggregateExpressions,
+          agg.child.output,
+          Seq.empty,
+          Map.empty,
+          Seq.empty,
+          Seq(agg),
+          NoFlags,
+          Seq.empty,
+          Seq.empty)
+      case ExtractSelectModule(
+      output,
+      input,
+      predicate,
+      aliasmap,
+      joinedge,
+      children,
+      flags,
+      fspec,
+      wspec) =>
+        Some(output, input, predicate, aliasmap, joinedge, children, flags, fspec, wspec)
+      case Window(exprs, _, _, child) =>
+        val ret: Option[(Seq[NamedExpression], Seq[Expression], Seq[Expression], Map[Int,
+          String], Seq[JoinEdge], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]], Seq[Seq[Any]])] =
+          collectSelectFromWindowChild(
+            child)
+        ret.map(r => (r._1, r._2, r._3, r._4, r._5, r._6, r._7, r._8, Seq(Seq(exprs)) ++ r._9))
+      case other => None
+    }
+  }
+}
+
+/**
+ * GroupByModule is extracted from the Aggregate node of logical plan.
+ * The groupingExpressions, aggregateExpressions are collected.
+ *
+ * The returned values for this match are as follows:
+ *  - Grouping attributes for the Aggregate node.
+ *  - Aggregates for the Aggregate node.
+ *  - Project list for project
+ *
+ */
+
+object ExtractGroupByModule extends PredicateHelper {
+  type ReturnType = (Seq[NamedExpression], Seq[Expression], Seq[Expression], Option[String],
+    LogicalPlan, FlagSet, Seq[Seq[Any]])
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    plan match {
+      case a@logical.Aggregate(_, _, e@Expand(_, _, p: Project)) if isGroupingSet(a, e, p) =>
+        // Assumption: Aggregate's groupingExpressions is composed of
+        // 1) the grouping attributes
+        // 2) gid, which is always the last one
+        val g = a.groupingExpressions.map(_.asInstanceOf[Attribute])
+        val numOriginalOutput = e.output.size - g.size
+        Some(
+          a.aggregateExpressions,
+          e.output,
+          a.groupingExpressions,
+          None,
+          p,
+          NoFlags.setFlag(EXPAND),
+          Seq(Seq(e.projections, e.output, numOriginalOutput)))
+      case logical.Aggregate(groupingExpressions, aggregateExpressions, child) =>
+        Some(
+          aggregateExpressions,
+          child.output,
+          groupingExpressions,
+          None,
+          child,
+          NoFlags,
+          Seq.empty)
+      case other => None
+    }
+  }
+
+  private def isGroupingSet(a: Aggregate, e: Expand, p: Project): Boolean = {
+    assert(a.child == e && e.child == p)
+
+    if (a.groupingExpressions.forall(_.isInstanceOf[Attribute])) {
+      val g = a.groupingExpressions.map(_.asInstanceOf[Attribute])
+      sameOutput(
+        e.output.drop(e.output.size - g.size),
+        a.groupingExpressions.map(_.asInstanceOf[Attribute]))
+    } else {
+      false
+    }
+  }
+
+  private def sameOutput(output1: Seq[Attribute], output2: Seq[Attribute]): Boolean = {
+    output1.size == output2.size &&
+    output1.zip(output2).forall(pair => pair._1.semanticEquals(pair._2))
+  }
+}
+
+object ExtractUnionModule extends PredicateHelper {
+  type ReturnType = (Seq[LogicalPlan], FlagSet, Seq[Seq[Any]])
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    plan match {
+      case u: Union =>
+        val children = collectUnionChildren(u)
+        Some(children, NoFlags, Seq.empty)
+      case _ => None
+    }
+  }
+
+  private def collectUnionChildren(plan: LogicalPlan): List[LogicalPlan] = {
+    plan match {
+      case Union(children) => children.toList match {
+        case head :: Nil => collectUnionChildren(head)
+        case head :: tail => collectUnionChildren(head) ++ collectUnionChildren(Union(tail))
+        case Nil => Nil
+      }
+      case other => other :: Nil
+    }
+  }
+}
+
+object ExtractTableModule extends PredicateHelper {
+  type ReturnType = (String, String, Seq[NamedExpression], Seq[LogicalPlan], FlagSet, Seq[Seq[Any]])
+
+  def unapply(plan: LogicalPlan): Option[ReturnType] = {
+    plan match {
+      // uncomment for cloudera1 version
+//      case m: CatalogRelation =>
+//        Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+//          Seq.empty)
+//       uncomment for apache version
+      case m: HiveTableRelation =>
+            Some(m.tableMeta.database, m.tableMeta.identifier.table, m.output, Nil, NoFlags,
+              Seq.empty)
+      case l: LogicalRelation =>
+        val tableIdentifier = l.catalogTable.map(_.identifier)
+        val database = tableIdentifier.map(_.database).flatten.getOrElse(null)
+        val table = tableIdentifier.map(_.table).getOrElse(null)
+        Some(database, table, l.output, Nil, NoFlags, Seq.empty)
+      case l: LocalRelation => // used for unit test
+        Some(null, null, l.output, Nil, NoFlags, Seq.empty)
+      case _ => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
new file mode 100644
index 0000000..0c5661e
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/LogicalPlanSignatureGenerator.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.catalog._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.mv.plans._
+
+object CheckSPJG {
+
+  def isSPJG(subplan: LogicalPlan): Boolean = {
+    subplan match {
+      case a: Aggregate =>
+        a.child.collect {
+          case Join(_, _, _, _) | Project(_, _) | Filter(_, _) |
+//               CatalogRelation(_, _, _) |
+               LogicalRelation(_, _, _) | LocalRelation(_, _) => true
+          case _ => false
+        }.forall(identity)
+      case _ => false
+    }
+  }
+}
+
+object LogicalPlanSignatureGenerator extends SignatureGenerator[LogicalPlan] {
+  lazy val rule: SignatureRule[LogicalPlan] = LogicalPlanRule
+
+  override def generate(plan: LogicalPlan): Option[Signature] = {
+    if ( plan.isSPJG ) {
+      super.generate(plan)
+    } else {
+      None
+    }
+  }
+}
+
+object LogicalPlanRule extends SignatureRule[LogicalPlan] {
+
+  def apply(plan: LogicalPlan, childSignatures: Seq[Option[Signature]]): Option[Signature] = {
+
+    plan match {
+      case LogicalRelation(_, _, _) =>
+        // TODO: implement this (link to BaseRelation)
+        None
+//      case CatalogRelation(tableMeta, _, _) =>
+//        Some(Signature(false,
+//          Set(Seq(tableMeta.database, tableMeta.identifier.table).mkString("."))))
+      case l: LocalRelation =>
+        // LocalRelation is for unit test cases
+        Some(Signature(groupby = false, Set(l.toString())))
+      case Filter(_, _) =>
+        if (childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby) {
+          // if (!childSignatures(0).getOrElse(Signature()).groupby) {
+          childSignatures(0)
+          // }
+        } else {
+          None
+        }
+      case Project(_, _) =>
+        if ( childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby ) {
+          childSignatures(0)
+        } else {
+          None
+        }
+      case Join(_, _, _, _) =>
+        if ( childSignatures.length == 2 &&
+             !childSignatures(0).getOrElse(Signature()).groupby &&
+             !childSignatures(1).getOrElse(Signature()).groupby ) {
+          Some(Signature(false,
+            childSignatures(0).getOrElse(Signature()).datasets
+              .union(childSignatures(1).getOrElse(Signature()).datasets)))
+        } else {
+          None
+        }
+      case Aggregate(_, _, _) =>
+        if ( childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby ) {
+          Some(Signature(true, childSignatures(0).getOrElse(Signature()).datasets))
+        } else {
+          None
+        }
+      case _ => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
new file mode 100644
index 0000000..b7641d5
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Printers.scala
@@ -0,0 +1,347 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.io.{OutputStream, PrintWriter, StringWriter}
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, _}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.util.quoteIdentifier
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuildDSL._
+
+// scalastyle:off println
+trait Printers {
+
+  abstract class FragmentPrinter(out: PrintWriter) {
+    protected var indentMargin = 0
+    protected val indentStep = 2
+    protected var indentString = "                                        " // 40
+
+    def indent(): Unit = indentMargin += indentStep
+
+    def undent(): Unit = indentMargin -= indentStep
+
+    def println(): Unit = {
+      out.println()
+      while (indentMargin > indentString.length()) {
+        indentString += indentString
+      }
+      if (indentMargin > 0) {
+        out.write(indentString, 0, indentMargin)
+      }
+    }
+
+    def printSeq[a](ls: List[a])(printelem: a => Unit)(printsep: => Unit) {
+      ls match {
+        case List() =>
+        case List(x) => printelem(x)
+        case x :: rest => printelem(x); printsep; printSeq(rest)(printelem)(printsep)
+      }
+    }
+
+    def printColumn(ts: List[Fragment], start: String, sep: String, end: String) {
+      print(start)
+      indent
+      println()
+      printSeq(ts) { print(_) } { print(sep); println() }
+      undent
+      println()
+      print(end)
+    }
+
+    def printRow(ts: List[Fragment], start: String, sep: String, end: String) {
+      print(start)
+      printSeq(ts) { print(_) } { print(sep) }
+      print(end)
+    }
+
+    def printRow(ts: List[Fragment], sep: String) { printRow(ts, "", sep, "") }
+
+    def printFragment(frag: Fragment): Unit
+
+    def print(args: Any*): Unit = {
+      args foreach {
+        case frag: Fragment =>
+          printFragment(frag)
+        case arg =>
+          out.print(if (arg == null) {
+            "null"
+          } else {
+            arg.toString
+          })
+      }
+    }
+  }
+
+  /**
+   * A sql fragment printer which is stingy about vertical whitespace and unnecessary
+   * punctuation.
+   */
+  class SQLFragmentCompactPrinter(out: PrintWriter) extends FragmentPrinter(out) {
+
+    object Position extends Enumeration {
+      type Position = Value
+      val Select, From, GroupBy, Where, Sort, Limit = Value
+    }
+
+    import Position._
+
+    def printSelect(select: Seq[NamedExpression], flags: FlagSet): Unit = {
+      if (flags.hasFlag(DISTINCT)) {
+        print("SELECT DISTINCT %s "
+          .format(select.map(_.sql) mkString ", "))
+      } else {
+        print("SELECT %s ".format(select.map(_.sql) mkString ", "))
+      }
+    }
+
+    def printFromElem(f: (SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])): Unit = {
+      f._2 map { t => print("%s JOIN ".format(t.sql)) }
+      printFragment(f._1)
+      if (f._3.nonEmpty) {
+        print(" ON %s".format(f._3.map(_.sql).mkString(" AND ")))
+      }
+    }
+
+    def printFrom(
+        from: Seq[(SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])]): Seq[Unit] = {
+      def fromIndented(
+          x: (SQLBuildDSL.Fragment, Option[JoinType], Seq[Expression])): Unit = {
+        indent
+        println()
+        printFromElem(x)
+        undent
+      }
+
+      print("FROM")
+      from map fromIndented
+    }
+
+    def printWhere(where: Seq[Expression]): Unit = {
+      if (where != Nil) {
+        print("WHERE")
+        //        if (where.nonEmpty) {
+        //          val condition = where(0)
+        //          val dt = condition.dataType
+        //          val t = condition.sql
+        //          print(t)
+        //        }
+        indent
+        println()
+        print("%s".format(where.map(_.sql).mkString(" AND ")))
+        undent
+      }
+    }
+
+    def printGroupby(groupby: (Seq[Expression], Seq[Seq[Expression]])): Unit = {
+
+      if (groupby._1.nonEmpty) {
+        print("GROUP BY %s".format(groupby._1.map(_.sql).mkString(", ")))
+        if (groupby._2.nonEmpty) {
+          print(" GROUPING SETS(%s)"
+            .format(groupby._2.map(e => s"(${ e.map(_.sql).mkString(", ") })").mkString(", ")))
+        }
+      }
+    }
+
+    def printHaving(having: Seq[Expression]): Unit = {
+      if (having != Nil) {
+        print("HAVING %s".format(having.map(_.sql).mkString(" AND ")))
+      }
+    }
+
+    def printUnion(sqls: Seq[Fragment]): Unit = {
+      sqls match {
+        case sql1 :: sql2 :: rest =>
+          printFragment(sql1)
+          println()
+          print("UNION ALL")
+          println()
+          printUnion(sql2 :: rest)
+        case sql :: Nil => printFragment(sql)
+      }
+    }
+
+    def printTable(name: Seq[String]): Unit = {
+      print("%s".format(name.last))
+    }
+
+    trait ExprSeq extends Seq[SortOrder]
+
+    def printExprModifiers(modifiers: (FlagSet, Seq[Seq[Any]]), pos: Position): Unit = {
+      val flagsNeedExprs = for {flag <- pickledListOrder
+                                if (modifiers._1.hasFlag(flag))} yield {
+        (flag)
+      }
+
+      flagsNeedExprs.zip(modifiers._2).foreach {
+        case (SORT, Seq(order)) =>
+          if (pos == Sort) { // TODO: how to verify type of order:  && order
+            // .isInstanceOf[Seq[SortOrder]]) {
+            println()
+            if (modifiers._1.hasFlag(GLOBAL)) {
+              print("ORDER BY %s"
+                .format(order.asInstanceOf[Seq[SortOrder]].map {
+                  // walk around for a sparkSQL bug
+                  s => {
+                         s.child match {
+                           case a: Alias =>
+                             val qualifierPrefix = a.qualifier
+                               .map(_ + ".").getOrElse("")
+                             s"$qualifierPrefix${
+                               quoteIdentifier(a
+                                 .name)
+                             }"
+
+                           case other => other.sql
+                         }
+                       } + " " + s.direction.sql + " " + s.nullOrdering.sql
+                }.mkString(", ")))
+            } else {
+              print("SORT BY %s".format(order.asInstanceOf[Seq[SortOrder]].map {
+                // walk around for a sparkSQL bug
+                s => {
+                       s.child match {
+                         case a: Alias =>
+                           val qualifierPrefix = a.qualifier.map(_ + ".")
+                             .getOrElse("")
+                           s"$qualifierPrefix${ quoteIdentifier(a.name) }"
+
+                         case other => other.sql
+                       }
+                     } + " " + s.direction.sql + " " + s.nullOrdering.sql
+              }.mkString(", ")))
+            }
+          }
+        case (LIMIT, Seq(limitExpr)) =>
+          if (pos == Limit && limitExpr.isInstanceOf[Expression]) {
+            println()
+            print(s"LIMIT ${ limitExpr.asInstanceOf[Expression].sql }")
+          }
+        case (EXPAND, Seq(_, _)) =>
+        case _ =>
+          throw new UnsupportedOperationException(s"unsupported modifiers: ${
+            flagToString(modifiers
+              ._1)
+          }")
+      }
+    }
+
+    override def printFragment(frag: Fragment): Unit = {
+      require(
+        frag.Supported,
+        "Fragment is not supported.  Current frag:\n" + frag)
+
+      frag match {
+        case spjg@SPJGFragment(select, from, where, groupby, having, alias, modifiers) =>
+          if (alias.nonEmpty) {
+            print("(")
+          }
+          printSelect(select, modifiers._1)
+          if (from.nonEmpty) {
+            println()
+          }
+          printFrom(from)
+          if (where.nonEmpty) {
+            println()
+          }
+          printWhere(where)
+          if (groupby._1.nonEmpty) {
+            println()
+          }
+          printGroupby(groupby)
+          if (having.nonEmpty) {
+            println()
+          }
+          printHaving(having)
+          if (modifiers._1.hasFlag(SORT)) {
+            printExprModifiers(modifiers, Sort)
+          }
+          if (modifiers._1.hasFlag(LIMIT)) {
+            printExprModifiers(modifiers, Limit)
+          }
+          alias.map { case a: String => print(") %s ".format(a)) }
+        case union@UNIONFragment(sqls, alias, modifiers) =>
+          if (alias.nonEmpty) {
+            print("(")
+          }
+          printUnion(sqls)
+          if (modifiers._1.hasFlag(LIMIT)) {
+            printExprModifiers(modifiers, Limit)
+          }
+          alias.map { case a: String => print(") %s ".format(a)) }
+        case table@TABLEFragment(name, alias, modifiers) =>
+          printTable(name)
+          alias.map { case a: String =>
+            if (!(a == name.last)) {
+              print(" %s ".format(a))
+            }
+          }
+        case _ =>
+          throw new UnsupportedOperationException(s"unsupported plan $frag")
+      }
+    }
+  }
+
+  /**
+   * A sql fragment printer which is one single line for a fragment.
+   */
+  class SQLFragmentOneLinePrinter(out: PrintWriter) extends SQLFragmentCompactPrinter(out) {
+    protected val sep = "  "
+
+    override def println() { print(sep) }
+  }
+
+  /** @group Printers */
+  protected def render(what: Any, mkPrinter: PrintWriter => FragmentPrinter): String = {
+    val buffer = new StringWriter()
+    val writer = new PrintWriter(buffer)
+    var printer = mkPrinter(writer)
+
+    printer.print(what)
+    writer.flush()
+    buffer.toString
+  }
+
+  def asCompactString(t: Fragment): String = render(t, newSQLFragmentCompactPrinter)
+
+  def asOneLineString(t: Fragment): String = render(t, newSQLFragmentOneLinePrinter)
+
+  def newSQLFragmentCompactPrinter(writer: PrintWriter): SQLFragmentCompactPrinter = {
+    new SQLFragmentCompactPrinter(
+      writer)
+  }
+
+  def newSQLFragmentCompactPrinter(stream: OutputStream): SQLFragmentCompactPrinter = {
+    newSQLFragmentCompactPrinter(
+      new PrintWriter(stream))
+  }
+
+  def newSQLFragmentOneLinePrinter(writer: PrintWriter): SQLFragmentOneLinePrinter = {
+    new SQLFragmentOneLinePrinter(
+      writer)
+  }
+
+  def newSQLFragmentOneLinePrinter(stream: OutputStream): SQLFragmentOneLinePrinter = {
+    newSQLFragmentOneLinePrinter(
+      new PrintWriter(stream))
+  }
+}
+// scalastyle:on println

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
new file mode 100644
index 0000000..8c52d28
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuild.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.util.concurrent.atomic.AtomicLong
+
+class SQLBuild private (
+    nextSubqueryId: AtomicLong,
+    subqueryPrefix: String) {
+  self =>
+
+  def this(subqueryPrefix: String) =
+    this(new AtomicLong(0), subqueryPrefix)
+
+  def newSubqueryName(): String = s"${subqueryPrefix}${nextSubqueryId.getAndIncrement()}"
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
new file mode 100644
index 0000000..f8e0b82
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuildDSL.scala
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, BitwiseAnd, Cast, Expression, Grouping, GroupingID, Literal, NamedExpression, ShiftRight}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.types.{ByteType, IntegerType}
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+trait SQLBuildDSL {
+
+  abstract class Fragment extends Product {
+    // this class corresponding to TreeNode of SparkSQL or Tree of Scala compiler
+    def canEqual(that: Any): Boolean = {
+      throw new UnsupportedOperationException
+    }
+
+    def productArity: Int = throw new UnsupportedOperationException
+
+    def productElement(n: Int): Any = throw new UnsupportedOperationException
+
+    // TODO: add library needed for SQL building
+    def isUnsupported: Boolean
+
+    def Supported: Boolean
+
+    // def pos: Position
+    def alias: Option[String]
+  }
+
+  case class SPJGFragment(
+      select: Seq[NamedExpression] = Nil, // TODO: more general, change to Seq[Either[Fragment,
+      // NamedExpression]]
+      from: Seq[(Fragment, Option[JoinType], Seq[Expression])] = Nil,
+      where: Seq[Expression] = Nil,
+      groupby: (Seq[Expression], Seq[Seq[Expression]]) = (Nil, Nil),
+      having: Seq[Expression] = Nil,
+      alias: Option[String] = None,
+      modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+    override def isUnsupported: Boolean = {
+      select == Nil || from == Nil ||
+      from.map(_._1.isUnsupported).foldLeft(false)(_ || _)
+    }
+
+    override def Supported: Boolean = !isUnsupported
+  }
+
+  //  TODO: find a scheme to break up fragmentExtract() using unapply
+  //
+  //  object SPJGFragment {
+  //    type ReturnType = (Seq[NamedExpression],Seq[(Fragment, Option[JoinType],Seq[Expression])
+  // ],Seq[Expression],Seq[Expression],Seq[Expression],Option[String])
+  //
+  //    def unapply(plan: ModularPlan): Option[ReturnType] = fragmentExtract(plan, None)
+  //  }
+
+  case class TABLEFragment(
+      table: Seq[String] = Nil,
+      alias: Option[String] = None,
+      modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+    override def isUnsupported: Boolean = table == Nil
+
+    override def Supported: Boolean = !isUnsupported
+  }
+
+  case class UNIONFragment(
+      union: Seq[Fragment] = Nil,
+      alias: Option[String] = None,
+      modifiers: (FlagSet, Seq[Seq[Any]])) extends Fragment {
+    override def isUnsupported: Boolean = {
+      union == Nil ||
+      union.map(_.isUnsupported).foldLeft(false)(_ || _)
+    }
+
+    override def Supported: Boolean = !isUnsupported
+  }
+
+  val UnsupportedFragment: Fragment = new Fragment {
+    def isUnsupported = true
+
+    def Supported = false
+
+    def alias = None
+  }
+
+  /**
+   * Turns a bunch of string segments into a single string and separate each segment by a space.
+   * The segments are trimmed so only a single space appears in the separation.
+   * For example, `build("a", " b ", " c")` becomes "a b c".
+   */
+  protected def build(segments: String*): String = {
+    segments.map(_.trim).filter(_.nonEmpty).mkString(" ")
+  }
+
+  def fragmentExtract(plan: ModularPlan, alias: Option[String]): Fragment = {
+    if (plan.rewritten) {
+      fragmentExtract_Rewritten(plan, alias)
+    } else {
+      fragmentExtract_NonRewritten(plan, alias)
+    }
+  }
+
+  // Rewritten portion of query plan
+  private def fragmentExtract_Rewritten(
+      plan: ModularPlan,
+      alias: Option[String]): Fragment = {
+    plan match {
+      case s1@modular.Select(_, _, _, _, _,
+        Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+        _, _, _) if (!s1.skip && !g.skip && !s2.skip) =>
+        extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+      case s1@modular.Select(_, _, _, _, _,
+        Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+        _, _, _) if (s1.skip && g.skip && s2.skip) =>
+        extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+      case s1@modular.Select(_, _, _, _, _,
+        Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+        _, _, _) if (!s1.skip && !g.skip && s2.skip) =>
+        extractRewrittenSelectGroupBy(s1, g, alias)
+
+      case s1@modular.Select(_, _, _, _, _, Seq(s2@modular.Select(_, _, _, _, _, _, _, _, _)),
+        _, _, _) if (!s1.skip && s2.skip) =>
+        extractRewrittenSelect(s1, alias)
+
+      case other => extractSimpleOperator(other, alias)
+    }
+  }
+
+  // Non-rewritten portion of query plan
+  private def fragmentExtract_NonRewritten(
+      plan: ModularPlan,
+      alias: Option[String]): Fragment = {
+    plan match {
+      case s1@modular.Select(_, _, _, _, _,
+        Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+        _, _, _) if (s1.aliasMap.isEmpty && !g.rewritten) =>
+        extractRewrittenOrNonRewrittenSelectGroupBySelect(s1, g, s2, alias)
+
+      case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)
+        if (g.alias.isEmpty && !s2.rewritten) =>
+        val fragmentList = s2.children.zipWithIndex
+          .map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
+        val fList = s2.joinEdges.map {
+          e => {
+            (e.right, (fragmentList(e.right), Some(e.joinType), s2
+              .extractRightEvaluableConditions(s2.children(e.left), s2.children(e.right))))
+          }
+        }.toMap
+        val from = (0 to fragmentList.length - 1)
+          .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+        val excludesPredicate = from.flatMap(_._3).toSet
+        val (select, (groupByExprs, groupingSet)) = addGroupingSetIfNeeded(g, s2)
+        SPJGFragment(
+          select,
+          from,
+          s2.predicateList.filter { p => !excludesPredicate(p) },
+          (groupByExprs, groupingSet),
+          Nil,
+          alias,
+          (g.flags, g.flagSpec))
+
+      case g@modular.GroupBy(_, _, _, _, _, _, _) if (g.alias.nonEmpty) =>
+        val from = Seq((fragmentExtract(g.child, g.alias), None, Nil))
+        SPJGFragment(
+          g.outputList,
+          from,
+          Nil,
+          (g.predicateList, Seq.empty),
+          Nil,
+          alias,
+          (g.flags, g.flagSpec))
+
+      case other => extractSimpleOperator(other, alias)
+    }
+  }
+
+  // used in both rewritten and non-rewritten cases
+  // currently in rewritten cases we don't consider grouping set
+  private def extractRewrittenOrNonRewrittenSelectGroupBySelect(
+      s1: modular.Select,
+      g: modular.GroupBy,
+      s2: modular.Select,
+      alias: Option[String]): Fragment = {
+    val fragmentList = s2.children.zipWithIndex
+      .map { case (child, index) => fragmentExtract(child, s2.aliasMap.get(index)) }
+    val fList = s2.joinEdges.map {
+      e => {
+        (e.right, (fragmentList(e.right), Some(e.joinType), s2
+          .extractRightEvaluableConditions(s2.children(e.left), s2.children(e.right))))
+      }
+    }.toMap
+    val from = (0 to fragmentList.length - 1)
+      .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+    val excludesPredicate = from.flatMap(_._3).toSet
+    val aliasMap = AttributeMap(g.outputList.collect { case a: Alias => (a.toAttribute, a) })
+    val windowExprs = s1.windowSpec
+      .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+      .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+    val having = s1.predicateList
+      .map { case attr: Attribute => aliasMap.get(attr).map(_.child)
+        .getOrElse(attr);
+      case expr: Expression => expr.transform { case a: Alias => a.child };
+      case other => other
+      }
+
+    val (select_g, (groupByExprs, groupingSet)) = addGroupingSetIfNeeded(g, s2)
+
+    val gSet = AttributeSet(g.outputList.map(_.toAttribute))
+    val sSet = AttributeSet(s1.outputList.map(_.toAttribute))
+    val select = if (groupingSet.nonEmpty) {
+      if (gSet.equals(sSet) && windowExprs.isEmpty) {
+        select_g
+      } else {
+        throw new UnsupportedOperationException
+      }
+    } else
+    // TODO:  how to handle alias of attribute in MV
+    {
+      s1.outputList.map { attr => aliasMap.get(attr.toAttribute).getOrElse(attr) } ++ windowExprs
+    }
+
+    SPJGFragment(
+      select, // select
+      from, // from
+      s2.predicateList.filter { p => !excludesPredicate(p) }, // where
+      (groupByExprs, groupingSet), // group by
+      having, // having
+      alias,
+      (s1.flags, s1.flagSpec))
+  }
+
+  // used in rewritten cases only -- don't consider grouping set
+  private def extractRewrittenSelectGroupBy(
+      s1: modular.Select,
+      g: modular.GroupBy,
+      alias: Option[String]): Fragment = {
+    val fragment = fragmentExtract(g.child, g.alias)
+    val from = Seq((fragment, None, Nil))
+    val aliasMap = AttributeMap(g.outputList.collect { case a: Alias => (a.toAttribute, a) })
+    val windowExprs = s1.windowSpec
+      .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+      .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+    val having = s1.predicateList
+      .map { case attr: Attribute => aliasMap.get(attr).map(_.child)
+        .getOrElse(attr);
+      case expr: Expression => expr.transform { case a: Alias => a.child };
+      case other => other
+      }
+
+    val (select_g, (groupByExprs, groupingSet)) = (g.outputList, (g.predicateList, Seq.empty))
+
+    // TODO:  how to handle alias of attribute in MV
+    val select = s1.outputList.map { attr => aliasMap.get(attr.toAttribute).getOrElse(attr) } ++
+                 windowExprs
+    SPJGFragment(
+      select, // select
+      from, // from
+      Nil, // where
+      (groupByExprs, groupingSet), // group by
+      having, // having
+      alias,
+      (s1.flags, s1.flagSpec))
+  }
+
+  private def extractRewrittenSelect(s1: modular.Select, alias: Option[String]): Fragment = {
+    val fragment = fragmentExtract(s1.children(0), s1.aliasMap.get(0))
+    val from = Seq((fragment, None, Nil))
+    val windowExprs = s1.windowSpec
+      .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+      .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+    val select = s1.outputList.map(_.toAttribute)
+
+    SPJGFragment(
+      select, // select
+      from, // from
+      s1.predicateList, // where
+      (Nil, Nil), // group by
+      Nil, // having
+      alias,
+      (s1.flags, s1.flagSpec))
+  }
+
+  private def extractSimpleOperator(
+      operator: ModularPlan,
+      alias: Option[String]): Fragment = {
+    operator match {
+      case s@modular.Select(_, _, _, _, _, _, _, _, _) =>
+        val fragmentList = s.children.zipWithIndex
+          .map { case (child, index) => fragmentExtract(child, s.aliasMap.get(index)) }
+        val fList = s.joinEdges.map {
+          e => {
+            (e.right, (fragmentList(e.right), Some(e.joinType), s
+              .extractRightEvaluableConditions(s.children(e.left), s.children(e.right))))
+          }
+        }.toMap
+        val from = (0 to fragmentList.length - 1)
+          .map(index => fList.get(index).getOrElse((fragmentList(index), None, Nil)))
+        val excludesPredicate = from.flatMap(_._3).toSet
+        val windowExprs = s.windowSpec
+          .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+          .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+        val select = s.outputList ++ windowExprs
+
+        SPJGFragment(
+          select, // select
+          from, // from
+          s.predicateList.filter { p => !excludesPredicate(p) }, // where
+          (Nil, Nil), // group by
+          Nil, // having
+          alias,
+          (s.flags, s.flagSpec))
+
+      case u@modular.Union(_, _, _) =>
+        UNIONFragment(
+          u.children.zipWithIndex.map { case (child, index) => fragmentExtract(child, None) },
+          alias,
+          (u.flags, u.flagSpec))
+
+      case d@modular.ModularRelation(_, _, _, _, _) =>
+        if (d.databaseName != null && d.tableName != null) {
+          TABLEFragment(
+            Seq(d.databaseName, d.tableName), alias, (d.flags, d.rest))
+        } else {
+          TABLEFragment(Seq((d.output).toString()), alias, (d.flags, d.rest))
+        }
+
+      case h@modular.HarmonizedRelation(_) =>
+        fragmentExtract(h.source, alias)
+
+      case _ => UnsupportedFragment
+    }
+  }
+
+  private def addGroupingSetIfNeeded(g: modular.GroupBy, s: modular.Select) = {
+    if (g.flags.hasFlag(EXPAND)) {
+      assert(g.predicateList.length > 1)
+      val flagsNeedExprs =
+        for {flag <- pickledListOrder if (g.flags.hasFlag(flag))} yield {
+          flag
+        }
+      flagsNeedExprs.zip(g.flagSpec).collect {
+        case (EXPAND, Seq(projections_, output_, numOriginalOutput_)) =>
+          val output = output_.asInstanceOf[Seq[Attribute]]
+          val projections = projections_.asInstanceOf[Seq[Seq[Expression]]]
+          val numOriginalOutput = numOriginalOutput_.asInstanceOf[Int]
+
+          // The last column of Expand is always grouping ID
+          val gid = output.last
+
+          val groupByAttributes = g.predicateList.dropRight(1).map(_.asInstanceOf[Attribute])
+          // Assumption: project's projectList is composed of
+          // 1) the original output (Project's child.output),
+          // 2) the aliased group by expressions.
+          val expandedAttributes = s.output.drop(numOriginalOutput)
+          val groupByExprs = s.outputList.drop(numOriginalOutput).map(_.asInstanceOf[Alias].child)
+
+          // a map from group by attributes to the original group by expressions.
+          val groupByAttrMap = AttributeMap(groupByAttributes.zip(groupByExprs))
+          // a map from expanded attributes to the original group by expressions.
+          val expandedAttrMap = AttributeMap(expandedAttributes.zip(groupByExprs))
+
+          val groupingSet: Seq[Seq[Expression]] = projections.map { project =>
+            // Assumption: expand.projections is composed of
+            // 1) the original output (Project's child.output),
+            // 2) expanded attributes(or null literal)
+            // 3) gid, which is always the last one in each project in Expand
+            project.drop(numOriginalOutput).dropRight(1).collect {
+              case attr: Attribute if expandedAttrMap.contains(attr) => expandedAttrMap(attr)
+            }
+          }
+
+          val aggExprs = g.outputList.map { case aggExpr =>
+            val originalAggExpr = aggExpr.transformDown {
+              // grouping_id() is converted to VirtualColumn.groupingIdName by Analyzer. Revert
+              // it back.
+              case ar: AttributeReference if ar == gid => GroupingID(Nil)
+              case ar: AttributeReference if groupByAttrMap.contains(ar) => groupByAttrMap(ar)
+              case a@Cast(
+              BitwiseAnd(
+              ShiftRight(ar: AttributeReference, Literal(value: Any, IntegerType)),
+              Literal(1, IntegerType)), ByteType, None) if ar == gid =>
+                // for converting an expression to its original SQL format grouping(col)
+                val idx = groupByExprs.length - 1 - value.asInstanceOf[Int]
+                groupByExprs.lift(idx).map(Grouping).getOrElse(a)
+            }
+
+            originalAggExpr match {
+              // Ancestor operators may reference the output of this grouping set,
+              // and we use exprId to generate a unique name for each attribute, so we should
+              // make sure the transformed aggregate expression won't change the output,
+              // i.e. exprId and alias name should remain the same.
+              case ne: NamedExpression if ne.exprId == aggExpr.exprId => ne
+              case e => Alias(e, aggExpr.name)(exprId = aggExpr.exprId)
+            }
+          }
+          (aggExprs, (groupByExprs, groupingSet))
+
+      }.head
+    } else {
+      (g.outputList, (g.predicateList, Seq.empty))
+    }
+  }
+}
+
+object SQLBuildDSL extends SQLBuildDSL

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
new file mode 100644
index 0000000..244dadf
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/SQLBuilder.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import java.util.concurrent.atomic.AtomicLong
+
+import scala.collection.immutable
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
+
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.{Select, _}
+import org.apache.carbondata.mv.plans.util.SQLBuildDSL.Fragment
+
+// TODO: generalize this using SQLBuild to handle nested sub-query expression, using the
+// correspondence
+//      SQLBuild <--> QueryRewrite
+class SQLBuilder private(
+    modularPlan: ModularPlan,
+    nextSubqueryId: AtomicLong,
+    subqueryPrefix: String) {
+
+  def this(modularPlan: ModularPlan) = {
+    this(
+      SQLBuilder.subqueryPreprocess(modularPlan, new AtomicLong()),
+      new AtomicLong(0),
+      "gen_subquery_")
+  }
+
+  // only allow one level scalar subquery
+  def this(modularPlan: ModularPlan, subqueryPrefix: String) = {
+    this(SQLBuilder.screenOutPlanWithSubquery(modularPlan), new AtomicLong(0), subqueryPrefix)
+  }
+
+  private def newSubqueryName(): String = {
+    s"${ subqueryPrefix }${
+      nextSubqueryId.getAndIncrement
+      ()
+    }"
+  }
+
+  def fragmentExtract: Fragment = {
+    val finalPlan: ModularPlan = SQLizer.execute(modularPlan)
+    SQLBuildDSL.fragmentExtract(finalPlan, None)
+  }
+
+  object SQLizer extends RuleExecutor[ModularPlan] {
+    override protected def batches: Seq[Batch] = {
+      Seq(
+        Batch(
+          "Recover Scoping Info", Once,
+          // Clean up qualifiers due to erasure of sub-query aliases to use base tables as
+          // qualifiers
+          // This complements AddSubquery to handle corner case qMV3 in Tpcds_1_4_Querybatch.scala
+          CleanupQualifier,
+          // Insert sub queries on top of operators that need to appear after FROM clause.
+          AddSubquery
+        )
+      )
+    }
+
+    object CleanupQualifier extends Rule[ModularPlan] {
+      override def apply(tree: ModularPlan): ModularPlan = {
+        tree transformDown {
+          case s@modular.Select(_, _, _, _, _,
+            Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+            _, _, _) if !s.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+            val attrMap = AttributeMap(s2.outputList
+              .collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
+            cleanupQualifier(s, s2.aliasMap, s2.children, attrMap)
+
+          case g@modular.GroupBy(_, _, _, _,
+            s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)
+            if !g.rewritten && s2.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+            val attrMap = AttributeMap(s2.outputList
+              .collect { case a: Alias if a.child.isInstanceOf[Attribute] => (a.toAttribute, a) })
+            cleanupQualifier(g, s2.aliasMap, s2.children, attrMap)
+
+          case s@modular.Select(_, _, _, _, _, _, _, _, _)
+            if !s.rewritten && s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+            cleanupQualifier(s, s.aliasMap, s.children, AttributeMap(Seq.empty[(Attribute, Alias)]))
+
+        }
+      }
+
+      private def cleanupQualifier(node: ModularPlan,
+          aliasMap: Map[Int, String],
+          children: Seq[ModularPlan],
+          attrMap: AttributeMap[Alias]): ModularPlan = {
+        node transform {
+          case plan if !plan.rewritten && !plan.isInstanceOf[modular.LeafNode] =>
+            plan.transformExpressions {
+              case ref: Attribute =>
+                val i = children.indexWhere(_.outputSet.contains(ref))
+                if (i > -1) {
+                  // this is a walk around for mystery of spark qualifier
+                  if (aliasMap.nonEmpty && aliasMap(i).nonEmpty) {
+                    AttributeReference(
+                      ref.name,
+                      ref.dataType)(exprId = ref.exprId, qualifier = Option(aliasMap(i)))
+                  } else {
+                    ref
+                  }
+                } else {
+                  attrMap.get(ref) match {
+                    case Some(alias) => Alias(alias.child, alias.name)(exprId = alias.exprId)
+                    case None => ref
+                  }
+                }
+            }
+        }
+      }
+    }
+
+    object AddSubquery extends Rule[ModularPlan] {
+      override def apply(tree: ModularPlan): ModularPlan = {
+        tree transformUp {
+          case s@modular.Select(_, _, _, _, _,
+            Seq(g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+            _, _, _) if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => s
+
+          case g@modular.GroupBy(_, _, _, _, s2@modular.Select(_, _, _, _, _, _, _, _, _), _, _)
+            if s2.children.forall { _.isInstanceOf[modular.LeafNode] } => g
+
+          case s@modular.Select(_, _, _, _, _, _, _, _, _)
+            if !s.rewritten && !s.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+            var list: List[(Int, String)] = List()
+            var newS = s.copy()
+            s.children.zipWithIndex.filterNot { _._1.isInstanceOf[modular.LeafNode] }.foreach {
+              case (child: ModularPlan, index) if (!s.aliasMap.contains(index)) =>
+                val subqueryName = newSubqueryName()
+                val windowAttributeSet = if (child.isInstanceOf[Select]) {
+                  val windowExprs = child.asInstanceOf[Select].windowSpec
+                    .map { case Seq(expr) => expr.asInstanceOf[Seq[NamedExpression]] }
+                    .foldLeft(Seq.empty.asInstanceOf[Seq[NamedExpression]])(_ ++ _)
+                  SQLBuilder.collectAttributeSet(windowExprs)
+                } else {
+                  AttributeSet.empty
+                }
+                val subqueryAttributeSet = child.outputSet ++ windowAttributeSet
+                //              TODO: how to use alias to avoid duplicate names with distinct
+                // exprIds
+                //              val duplicateNames = collectDuplicateNames(subqueryAttributeSet)
+                // .toSet
+                if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+                  new UnsupportedOperationException(s"duplicate name(s): ${
+                    child.output
+                      .map(_.toString + ", ")
+                  }")
+                }
+                list = list :+ ((index, subqueryName))
+                newS = newS.transformExpressions {
+                  case ref: Attribute if (subqueryAttributeSet.contains(ref)) =>
+                    AttributeReference(ref.name, ref.dataType)(
+                      exprId = ref.exprId,
+                      qualifier = Some(subqueryName))
+                  case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
+                    Alias(alias.child, alias.name)(
+                      exprId = alias.exprId,
+                      qualifier = Some(subqueryName))
+                }
+
+              case _ =>
+            }
+            if (list.nonEmpty) {
+              list = list ++ s.aliasMap.toSeq
+              newS.copy(aliasMap = list.toMap)
+            } else {
+              newS
+            }
+
+          case g@modular.GroupBy(_, _, _, _, _, _, _) if (!g.rewritten && g.alias.isEmpty) =>
+            val newG = if (g.outputList.isEmpty) {
+              val ol = g.predicateList.map { case a: Attribute => a }
+              g.copy(outputList = ol)
+            } else {
+              g
+            }
+            val subqueryName = newSubqueryName()
+            val subqueryAttributeSet = newG.child.outputSet
+            if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+              new UnsupportedOperationException(s"duplicate name(s): ${
+                newG.child.output.map(_.toString + ", ")
+              }")
+            }
+            newG.transformExpressions {
+              case ref: AttributeReference if (subqueryAttributeSet.contains(ref)) =>
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(subqueryName))
+              case alias: Alias if (subqueryAttributeSet.contains(alias.toAttribute)) =>
+                Alias(alias.child, alias.name)(
+                  exprId = alias.exprId,
+                  qualifier = Some(subqueryName))
+            }.copy(alias = Some(subqueryName))
+        }
+      }
+    }
+  }
+
+}
+
+object SQLBuilder {
+  def collectAttributeSet(outputList: Seq[Expression]): AttributeSet = {
+    AttributeSet(outputList
+      .collect {
+        case a@Alias(_, _) => a.toAttribute
+        case a: Attribute => a
+      })
+  }
+
+  def collectDuplicateNames(s: AttributeSet): immutable.Iterable[String] = {
+    s.baseSet.map(_.a).groupBy(_.name).collect { case (x, ys) if ys.size > 1 => x }
+  }
+
+  def subqueryPreprocess(plan: ModularPlan, nextScalarSubqueryId: AtomicLong): ModularPlan = {
+    def newSubqueryName(nextScalarSubqueryId: AtomicLong): String = {
+      s"gen_expression_${
+        nextScalarSubqueryId
+          .getAndIncrement()
+      }_"
+    }
+
+    plan.transformAllExpressions {
+      case s: ModularSubquery =>
+        if (s.children.isEmpty) {
+          val subqueryName = newSubqueryName(nextScalarSubqueryId)
+          SubqueryHolder(s"(${ s.plan.asOneLineSQL(subqueryName) })")
+        } else {
+          throw new UnsupportedOperationException(s"Expression $s can't converted to SQL")
+        }
+      case o => o
+    }
+  }
+
+  def screenOutPlanWithSubquery(plan: ModularPlan): ModularPlan = {
+    plan.transformAllExpressions {
+      case s: ModularSubquery =>
+        throw new UnsupportedOperationException(s"Nested scala subquery $s doesn't supported")
+      case e => e
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
new file mode 100644
index 0000000..c46124b
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/Signature.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+case class Signature(groupby: Boolean = true, datasets: Set[String] = Set.empty)
+
+abstract class SignatureRule[BaseType <: TreeNode[BaseType]] extends Logging {
+  def apply(subplan: BaseType, signatures: Seq[Option[Signature]]): Option[Signature]
+}
+
+abstract class SignatureGenerator[BaseType <: TreeNode[BaseType]] extends Logging {
+  protected val rule: SignatureRule[BaseType]
+
+  def generate(subplan: BaseType): Option[Signature] = {
+    generateUp(subplan)
+  }
+
+  protected def generateChildren(subplan: BaseType, nextOperation: BaseType => Option[Signature]
+  ): Seq[Option[Signature]] = {
+    subplan.children.map { child =>
+      nextOperation(child.asInstanceOf[BaseType])
+    }
+  }
+
+  def generateUp(subplan: BaseType): Option[Signature] = {
+    val childSignatures = generateChildren(subplan, t => generateUp(t))
+    val lastSignature = rule(subplan, childSignatures)
+    lastSignature
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
new file mode 100644
index 0000000..021518c
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/util/TableCluster.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.util
+
+import com.fasterxml.jackson.annotation.{JsonCreator, JsonProperty, JsonRawValue}
+import com.google.common.base.Objects
+
+class TableCluster @JsonCreator()(@JsonProperty("fact") @JsonRawValue fact: Set[String],
+    @JsonProperty("dimension") @JsonRawValue dimension: Set[String]) {
+
+  //  @JsonProperty
+  def getFact(): Set[String] = {
+    fact
+  }
+
+  //
+  //  @JsonProperty
+  def getDimension(): Set[String] = {
+    dimension
+  }
+
+  @Override
+  override def toString: String = {
+    Objects.toStringHelper(this)
+      .add("fact", fact)
+      .add("dimension", dimension)
+      .toString
+  }
+
+  /*
+  @Override
+  def toString = {
+    MoreObjects.toStringHelper(this)
+    .add("fact", fact)
+    .add("dimension", dimension)
+    .toString
+  }
+  *
+  */
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
new file mode 100644
index 0000000..09354ae
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/testutil/ModularPlanTest.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.testutil
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.util.PlanTest
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, OneRowTable, Select}
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+/**
+ * Provides helper methods for comparing plans.
+ */
+abstract class ModularPlanTest extends PlanTest with PredicateHelper {
+
+  /**
+   * Since attribute references are given globally unique ids during analysis,
+   * we must normalize them to check if two different queries are identical.
+   */
+  protected def normalizeExprIds(plan: ModularPlan): plan.type = {
+    plan transformAllExpressions {
+      case s: ScalarSubquery =>
+        s.copy(exprId = ExprId(0))
+      case e: Exists =>
+        e.copy(exprId = ExprId(0))
+      case l: ListQuery =>
+        l.copy(exprId = ExprId(0))
+      case a: AttributeReference =>
+        AttributeReference(a.name, a.dataType, a.nullable)(exprId = ExprId(0))
+      case a: Alias =>
+        Alias(a.child, a.name)(exprId = ExprId(0))
+      case ae: AggregateExpression =>
+        ae.copy(resultId = ExprId(0))
+    }
+  }
+
+  /**
+   * Normalizes plans:
+   * - Filter the filter conditions that appear in a plan. For instance,
+   * ((expr 1 && expr 2) && expr 3), (expr 1 && expr 2 && expr 3), (expr 3 && (expr 1 && expr 2)
+   *   etc., will all now be equivalent.
+   * - Sample the seed will replaced by 0L.
+   * - Join conditions will be resorted by hashCode.
+   */
+  protected def normalizePlan(plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case filter@Filter(condition: Expression, child: LogicalPlan) =>
+        Filter(
+          splitConjunctivePredicates(condition).map(rewriteEqual(_)).sortBy(_.hashCode())
+            .reduce(And), child)
+      case sample: Sample =>
+        sample.copy(seed = 0L)(true)
+      case join@Join(left, right, joinType, condition) if condition.isDefined =>
+        val newCondition =
+          splitConjunctivePredicates(condition.get).map(rewriteEqual(_)).sortBy(_.hashCode())
+            .reduce(And)
+        Join(left, right, joinType, Some(newCondition))
+    }
+  }
+
+  /**
+   * Rewrite [[EqualTo]] and [[EqualNullSafe]] operator to keep order. The following cases will be
+   * equivalent:
+   * 1. (a = b), (b = a);
+   * 2. (a <=> b), (b <=> a).
+   */
+  private def rewriteEqual(condition: Expression): Expression = {
+    condition match {
+      case eq@EqualTo(l: Expression, r: Expression) =>
+        Seq(l, r).sortBy(_.hashCode()).reduce(EqualTo)
+      case eq@EqualNullSafe(l: Expression, r: Expression) =>
+        Seq(l, r).sortBy(_.hashCode()).reduce(EqualNullSafe)
+      case _ => condition // Don't reorder.
+    }
+  }
+
+  //
+  //  /** Fails the test if the two plans do not match */
+  //  protected def comparePlans(plan1: LogicalPlan, plan2: LogicalPlan) {
+  //    val normalized1 = normalizePlan(normalizeExprIds(plan1))
+  //    val normalized2 = normalizePlan(normalizeExprIds(plan2))
+  //    if (normalized1 != normalized2) {
+  //      fail(
+  //        s"""
+  //          |== FAIL: Plans do not match ===
+  //          |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+  //         """.stripMargin)
+  //    }
+  //  }
+  //
+  //  /** Fails the test if the two expressions do not match */
+  //  protected def compareExpressions(e1: Expression, e2: Expression): Unit = {
+  //    comparePlans(Filter(e1, OneRowRelation), Filter(e2, OneRowRelation))
+  //  }
+  //
+  //  /** Fails the test if the join order in the two plans do not match */
+  //  protected def compareJoinOrder(plan1: LogicalPlan, plan2: LogicalPlan) {
+  //    val normalized1 = normalizePlan(normalizeExprIds(plan1))
+  //    val normalized2 = normalizePlan(normalizeExprIds(plan2))
+  //    if (!sameJoinPlan(normalized1, normalized2)) {
+  //      fail(
+  //        s"""
+  //           |== FAIL: Plans do not match ===
+  //           |${sideBySide(normalized1.treeString, normalized2.treeString).mkString("\n")}
+  //         """.stripMargin)
+  //    }
+  //  }
+  //
+  //  /** Consider symmetry for joins when comparing plans. */
+  //  private def sameJoinPlan(plan1: LogicalPlan, plan2: LogicalPlan): Boolean = {
+  //    (plan1, plan2) match {
+  //      case (j1: Join, j2: Join) =>
+  //        (sameJoinPlan(j1.left, j2.left) && sameJoinPlan(j1.right, j2.right)) ||
+  //          (sameJoinPlan(j1.left, j2.right) && sameJoinPlan(j1.right, j2.left))
+  //      case (p1: Project, p2: Project) =>
+  //        p1.projectList == p2.projectList && sameJoinPlan(p1.child, p2.child)
+  //      case _ =>
+  //        plan1 == plan2
+  //    }
+  //  }
+  /** Fails the test if the corresponding pairs of plans do not match */
+  protected def comparePlanCollections(planSet1: Seq[String], planSet2: Seq[String]) {
+    for ((plan1, plan2) <- planSet1 zip planSet2) {
+      compareMessages(plan1, plan2)
+    }
+  }
+
+  /** Fails the test if the two plans do not match */
+  /** Only expressionIds are normalized.  This is enough for our test cases */
+  /** For more general normalization, see Spark PlanTest.scala for Logical Plan */
+  protected def comparePlans(plan1: ModularPlan, plan2: ModularPlan) {
+    val normalized1 = normalizeExprIds(plan1)
+    val normalized2 = normalizeExprIds(plan2)
+    if (normalized1 != normalized2) {
+      fail(
+        s"""
+           |== FAIL: Plans do not match ===
+           |${ sideBySide(normalized1.treeString, normalized1.treeString).mkString("\n") }
+         """.stripMargin)
+    }
+  }
+
+  /** Fails the test if the two expressions do not match */
+  protected def compareExpressions(e1: Seq[Expression], e2: Seq[Expression]): Unit = {
+    comparePlans(
+      Select(Nil, Nil, e1, Map.empty, Nil, Seq(OneRowTable), NoFlags, Seq.empty, Seq.empty), modular
+        .Select(Nil, Nil, e2, Map.empty, Nil, Seq(OneRowTable), NoFlags, Seq.empty, Seq.empty))
+  }
+
+  protected def compareMessages(msg1: String, msg2: String) {
+    if (msg1 != msg2) {
+      fail(
+        s"""
+           |== FAIL: Messages do not match ==
+           |${ sideBySide(msg1, msg2).mkString("\n") }
+           """.stripMargin)
+    }
+  }
+}