You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/09 08:44:29 UTC

[11/12] carbondata git commit: [CARBONDATA-2242] Add Materialized View modules

[CARBONDATA-2242] Add Materialized View modules

Initial code for Materialized View DataMap.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/02fd7873
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/02fd7873
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/02fd7873

Branch: refs/heads/master
Commit: 02fd78734cce070cdf9e3dec1a4adac15b789dd3
Parents: be600bc
Author: Jacky Li <ja...@qq.com>
Authored: Fri Mar 9 13:59:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 9 16:42:18 2018 +0800

----------------------------------------------------------------------
 datamap/mv/core/pom.xml                         |   99 +
 .../org/apache/carbondata/mv/MQOSession.scala   |   86 +
 .../carbondata/mv/internal/SessionState.scala   |   56 +
 .../carbondata/mv/internal/SharedState.scala    |   65 +
 .../mv/rewrite/DefaultMatchMaker.scala          |  597 +++
 .../carbondata/mv/rewrite/MatchConditions.scala |   28 +
 .../carbondata/mv/rewrite/MatchMaker.scala      |   47 +
 .../carbondata/mv/rewrite/Navigator.scala       |  137 +
 .../carbondata/mv/rewrite/QueryRewrite.scala    |   53 +
 .../mv/rewrite/SummaryDatasetCatalog.scala      |  176 +
 .../apache/carbondata/mv/rewrite/Utils.scala    |  358 ++
 .../SelectSelectExactChildrenSuite.scala        |   76 +
 .../carbondata/mv/rewrite/TestSQLSuite.scala    |   99 +
 .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala |   90 +
 .../mv/rewrite/matching/TestSQLBatch.scala      |  214 +
 .../rewrite/matching/TestTPCDS_1_4_Batch.scala  | 2496 ++++++++++
 datamap/mv/plan/pom.xml                         |  158 +
 .../org/apache/carbondata/mv/dsl/package.scala  |  101 +
 .../mv/expressions/modular/subquery.scala       |  169 +
 .../mv/plans/modular/AggregatePushDown.scala    |  156 +
 .../carbondata/mv/plans/modular/Flags.scala     |   71 +
 .../mv/plans/modular/Harmonizer.scala           |  217 +
 .../mv/plans/modular/ModularPatterns.scala      |  237 +
 .../mv/plans/modular/ModularPlan.scala          |  205 +
 .../modular/ModularPlanSignatureGenerator.scala |   73 +
 .../mv/plans/modular/ModularRelation.scala      |  143 +
 .../mv/plans/modular/Modularizer.scala          |  117 +
 .../mv/plans/modular/basicOperators.scala       |   84 +
 .../mv/plans/modular/queryGraph.scala           |   24 +
 .../apache/carbondata/mv/plans/package.scala    |   55 +
 .../mv/plans/util/BirdcageOptimizer.scala       |  199 +
 .../plans/util/Logical2ModularExtractions.scala |  353 ++
 .../util/LogicalPlanSignatureGenerator.scala    |  101 +
 .../carbondata/mv/plans/util/Printers.scala     |  347 ++
 .../carbondata/mv/plans/util/SQLBuild.scala     |   31 +
 .../carbondata/mv/plans/util/SQLBuildDSL.scala  |  425 ++
 .../carbondata/mv/plans/util/SQLBuilder.scala   |  260 ++
 .../carbondata/mv/plans/util/Signature.scala    |   49 +
 .../carbondata/mv/plans/util/TableCluster.scala |   55 +
 .../mv/testutil/ModularPlanTest.scala           |  178 +
 .../mv/testutil/Tpcds_1_4_QueryBatch.scala      | 4293 ++++++++++++++++++
 .../mv/testutil/Tpcds_1_4_Tables.scala          |  845 ++++
 .../org/apache/carbondata/mv/TestSQLBatch.scala |  468 ++
 .../apache/carbondata/mv/TestSQLBatch2.scala    |  138 +
 .../mv/plans/ExtractJoinConditionsSuite.scala   |   67 +
 .../carbondata/mv/plans/IsSPJGHSuite.scala      |   59 +
 .../mv/plans/LogicalToModularPlanSuite.scala    |  196 +
 .../carbondata/mv/plans/ModularToSQLSuite.scala |  148 +
 .../carbondata/mv/plans/SignatureSuite.scala    |   97 +
 .../mv/plans/Tpcds_1_4_BenchmarkSuite.scala     |   88 +
 .../scala/org/apache/spark/sql/QueryTest.scala  |  352 ++
 pom.xml                                         |   22 +-
 tools/advisor/README                            |   64 +
 .../input/queries-2017-10-25.json.log.gz        |  Bin 0 -> 642 bytes
 .../input/queries-2017-11-02.json.log.gz        |  Bin 0 -> 1084 bytes
 tools/advisor/output/mv-candidate.sql           |   13 +
 tools/advisor/pom.xml                           |  151 +
 .../apache/carbondata/mv/tool/MVToolBase.scala  |  168 +
 .../manager/CommonSubexpressionManager.scala    |  498 ++
 .../manager/CommonSubexpressionRuleEngine.scala |   44 +
 .../manager/CostBasedMVRecommendation.scala     |  212 +
 .../preprocessor/QueryBatchPreprocessor.scala   |   75 +
 .../preprocessor/QueryBatchRuleEngine.scala     |   35 +
 ...1-5c81abd56c05e598_1733531031_data.0.parq.gz |  Bin 0 -> 462151 bytes
 ...6-1724efa8090227b0_1233867673_data.0.parq.gz |  Bin 0 -> 1797 bytes
 ...e-990a02877791fb85_1630871489_data.0.parq.gz |  Bin 0 -> 7730 bytes
 ...2-d483027fae4f4b9f_1568698772_data.0.parq.gz |  Bin 0 -> 2858 bytes
 .../resources/data/files/DIM_APN_IOT.dat.gz     |  Bin 0 -> 14006 bytes
 ...b-78c28edfa577c286_1251401173_data.0.parq.gz |  Bin 0 -> 16659914 bytes
 ...sdr_dyn_seq_custer_iot_all_hour_60min.dat.gz |  Bin 0 -> 6246209 bytes
 .../mv/tool/CostBasedCSEManagerSuite.scala      |  241 +
 .../apache/carbondata/mv/tool/MVToolSuite.scala |   68 +
 .../carbondata/mv/tool/QBProcessorSuite.scala   |   98 +
 .../mv/tool/constructing/TestSEQ_MVBatch.scala  |  244 +
 .../constructing/TestTPCDS_1_4_MVBatch.scala    | 1233 +++++
 .../spark/sql/hive/tpcds/TestHelper.scala       |   37 +
 .../apache/spark/sql/hive/tpcds/TestHive.scala  |  958 ++++
 .../sql/hive/tpcds/TestHiveSingleton.scala      |   36 +
 78 files changed, 19428 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
new file mode 100644
index 0000000..7291567
--- /dev/null
+++ b/datamap/mv/core/pom.xml
@@ -0,0 +1,99 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-mv-core</artifactId>
+  <name>Apache CarbonData :: Materialized View Core</name>
+
+  <properties>
+    <dev.path>${basedir}/../../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-mv-plan</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-compiler-plugin</artifactId>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <skipTests>true</skipTests>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <filereports>CarbonTestSuite.txt</filereports>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.7</source>
+          <target>1.7</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
new file mode 100644
index 0000000..f4e89f2
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/MQOSession.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import java.io.Closeable
+import java.math.BigInteger
+
+import scala.collection.mutable
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
+
+import org.apache.carbondata.mv.internal.{SessionState, SharedState}
+import org.apache.carbondata.mv.rewrite.QueryRewrite
+
+/**
+ * The entry point for working with multi-query optimization in Carbon. Allow the
+ * creation of CSEs (covering subexpression) as well as query rewrite
+ */
+class MQOSession private(
+    @transient val spark: SparkSession,
+    @transient private val existingSharedState: Option[SharedState])
+  extends Serializable with Closeable {
+
+  self =>
+
+  def this(spark: SparkSession) = {
+    this(spark, None)
+  }
+
+  /* ----------------------- *
+   |  Session-related state  |
+   * ----------------------- */
+
+  /**
+   * State shared across sessions, including the `SparkContext`, cached data, listener,
+   * and a catalog that interacts with external systems.
+   */
+  private[mv] lazy val sharedState: SharedState = {
+    existingSharedState.getOrElse(new SharedState(spark.sparkContext))
+  }
+
+  /**
+   * State isolated across sessions, including SQL configurations, temporary tables, registered
+   * functions, and everything else that accepts a [[org.apache.spark.sql.internal.SQLConf]].
+   */
+  @transient
+  private[mv] lazy val sessionState: SessionState = new SessionState(self)
+
+  @transient
+  lazy val tableFrequencyMap = new mutable.HashMap[String, Int]
+
+  @transient
+  lazy val consumersMap = new mutable.HashMap[BigInteger, mutable.Set[LogicalPlan]] with mutable
+  .MultiMap[BigInteger, LogicalPlan]
+
+  def rewrite(sqlText: String): QueryRewrite = {
+    val plan1 = spark.sql(sqlText).queryExecution.analyzed
+    sessionState.rewritePlan(plan1)
+  }
+
+  def rewrite(queryExecution: QueryExecution): QueryRewrite = {
+    val plan1 = queryExecution.analyzed
+    sessionState.rewritePlan(plan1)
+  }
+
+  override def close(): Unit = spark.close()
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
new file mode 100644
index 0000000..07291d0
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SessionState.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.internal
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.plans.modular.SimpleModularizer
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, Navigator, QueryRewrite}
+
+/**
+ * A class that holds all session-specific state in a given [[MQOSession]].
+ */
+private[mv] class SessionState(mqoSession: MQOSession) {
+
+  // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+  // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+  /**
+   * Internal catalog for managing table and database states.
+   */
+  lazy val catalog = mqoSession.sharedState.summaryDatasetCatalog
+
+  /**
+   * Modular query plan modularizer
+   */
+  lazy val modularizer = SimpleModularizer
+
+  /**
+   * Logical query plan optimizer.
+   */
+  lazy val optimizer = BirdcageOptimizer
+
+  lazy val matcher = DefaultMatchMaker
+
+  lazy val navigator: Navigator = new Navigator(catalog, mqoSession)
+
+  def rewritePlan(plan: LogicalPlan): QueryRewrite = new QueryRewrite(mqoSession, plan)
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
new file mode 100644
index 0000000..afc4565
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/internal/SharedState.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.internal
+
+import java.io.File
+
+import scala.io.Source
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, SQLContext}
+import org.apache.spark.sql.execution.CacheManager
+
+import org.apache.carbondata.mv.rewrite.SummaryDatasetCatalog
+
+/**
+ * A class that holds all state shared across sessions in a given [[SQLContext]].
+ */
+private[mv] class SharedState(val sparkContext: SparkContext) {
+
+  @transient
+  lazy val summaryDatasetCatalog = new SummaryDatasetCatalog(sparkContext)
+
+  def clearSummaryDatasetCatalog(): Unit = summaryDatasetCatalog.clearSummaryDatasetCatalog()
+
+  def registerSummaryDataset(query: DataFrame): Unit = {
+    summaryDatasetCatalog.registerSummaryDataset(query)
+  }
+
+  def unregisterSummaryDataset(query: DataFrame): Unit = {
+    summaryDatasetCatalog.unregisterSummaryDataset(query)
+  }
+
+  def refreshSummaryDatasetCatalog(filePath: String): Unit = {
+    val file = new File(filePath)
+    val sqlContext = new SQLContext(sparkContext)
+    if (file.exists) {
+      clearSummaryDatasetCatalog()
+      for (line <- Source.fromFile(file).getLines) {
+        registerSummaryDataset(sqlContext.sql(line))
+      }
+    }
+  }
+
+  /**
+   * Class for caching query results reused in future executions.
+   */
+  val cacheManager: CacheManager = new CacheManager
+}
+
+object SharedState {}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
new file mode 100644
index 0000000..f549e6e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -0,0 +1,597 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{GroupBy, JoinEdge, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuilder
+
+abstract class DefaultMatchMaker extends MatchMaker[ModularPlan]
+
+abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
+
+  /** Name for this pattern, automatically inferred based on class name. */
+  val patternName: String = {
+    val className = getClass.getName
+    if (className endsWith "$") className.dropRight(1) else className
+  }
+
+  def factorOutSubsumer(
+      compensation: ModularPlan,
+      subsumer: Matchable,
+      subsumerName: Option[String]): ModularPlan = {
+    val aliasMap = AttributeMap(
+        subsumer.outputList.collect {
+          case a: Alias if a.child.isInstanceOf[Attribute] =>
+            (a.child.asInstanceOf[Attribute], a.toAttribute)
+          })
+
+    val compensation1 = compensation.transform {
+      case plan if !plan.skip =>
+        plan.transformExpressions {
+          case a: AttributeReference =>
+            aliasMap
+              .get(a)
+              .map { ref =>
+                AttributeReference(
+                  ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = a.qualifier)
+              }.getOrElse(a)
+          }
+    }
+
+    val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
+    if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+      new UnsupportedOperationException(
+        s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
+    }
+    val compensationFinal = compensation1.transformExpressions {
+      case ref: Attribute if subqueryAttributeSet.contains(ref) =>
+        AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+      case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
+        Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+    }
+    compensationFinal
+  }
+}
+
+object DefaultMatchMaker extends DefaultMatchMaker {
+  lazy val patterns =
+    SelectSelectNoChildDelta ::
+    GroupbyGroupbyNoChildDelta ::
+    GroupbyGroupbySelectOnlyChildDelta ::
+    GroupbyGroupbyGroupbyChildDelta ::
+    SelectSelectSelectChildDelta ::
+    SelectSelectGroupbyChildDelta :: Nil
+}
+
+/**
+ * Convention:
+ * EmR: each subsumee's expression match some of subsumer's expression
+ * EdR: each subsumee's expression derive from some of subsumer's expression
+ * RmE: each subsumer's expression match some of subsumee's expression
+ * RdE: each subsumer's expression derive from some of subsumee's expression
+ */
+
+object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Boolean = {
+    if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) {
+      subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) ||
+      canEvaluate(exprE, subsumer)
+    } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) {
+      exprE match {
+        case a@Alias(_, _) =>
+          exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
+                                 a1.asInstanceOf[Alias].child.semanticEquals(a.child))
+        case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
+      }
+    } else {
+      false
+    }
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+
+    (subsumer, subsumee, compensation) match {
+      case (
+          sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _),
+          sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _), None
+        ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
+             sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+
+        // assume children (including harmonized relation) of subsumer and subsumee
+        // are 1-1 correspondence.
+        // Change the following two conditions to more complicated ones if we want to
+        // consider things that combine extrajoin, rejoin, and harmonized relations
+        val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 }
+        val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 }
+
+        val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
+        val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
+        val rejoinOutputList = rejoin.flatMap(_.output)
+
+        val isPredicateRmE = sel_1a.predicateList.forall(expr =>
+          sel_1q.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
+          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+        val isOutputEdR = sel_1q.outputList.forall(expr =>
+          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+
+        if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
+            isPredicateEmdR && isOutputEdR) {
+          val mappings = sel_1a.children.zipWithIndex.map {
+            case (childr, fromIdx) if sel_1q.children.contains(childr) =>
+              val toIndx = sel_1q.children.indexWhere(_ == childr)
+              (toIndx -> fromIdx)
+
+          }
+          val e2r = mappings.toMap
+          val r2e = e2r.map(_.swap)
+          val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
+              (r2e.get(x.left), r2e.get(x.right)) match {
+                case (Some(l), Some(r)) =>
+                  val mappedEdge = JoinEdge(l, r, x.joinType)
+                  val joinTypeEquivalent =
+                    if (sel_1q.joinEdges.contains(mappedEdge)) true
+                    else {
+                      x.joinType match {
+                        case Inner | FullOuter =>
+                          sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
+                        case _ => false
+                      }
+                    }
+                  if (joinTypeEquivalent) {
+                    val sel_1a_join = sel_1a.extractJoinConditions(
+                      sel_1a.children(x.left),
+                      sel_1a.children(x.right))
+                    val sel_1q_join = sel_1q.extractJoinConditions(
+                      sel_1q.children(mappedEdge.left),
+                      sel_1q.children(mappedEdge.right))
+                    sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) &&
+                    sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_)))
+                  } else false
+                case _ => false
+              }
+          }
+
+          val isPredicateEmR = sel_1q.predicateList.forall(expr =>
+            sel_1a.predicateList.exists(_.semanticEquals(expr)))
+          val isOutputEmR = sel_1q.outputList.forall(expr =>
+            sel_1a.outputList.exists(_.semanticEquals(expr)))
+          val isOutputRmE = sel_1a.outputList.forall(expr =>
+            sel_1q.outputList.exists(_.semanticEquals(expr)))
+
+          if (r2eJoinsMatch) {
+            if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty) {
+              Seq(sel_1a) // no compensation needed
+            } else {
+              val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+              val tAliasMap = new collection.mutable.HashMap[Int, String]()
+
+              tChildren += sel_1a
+              tAliasMap += (tChildren.indexOf(sel_1a) -> rewrite.newSubsumerName())
+
+              sel_1q.children.zipWithIndex.foreach {
+                case (childe, idx) =>
+                  if (e2r.get(idx).isEmpty) {
+                    tChildren += childe
+                    sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x))
+                  }
+              }
+
+              val tJoinEdges = sel_1q.joinEdges.collect {
+                case JoinEdge(le, re, joinType) =>
+                  (e2r.get(le), e2r.get(re)) match {
+                    case (Some(lr), None) =>
+                      JoinEdge(
+                        0,
+                        tChildren.indexOf(sel_1q.children(re)),
+                        joinType)
+                    case (None, None) =>
+                      JoinEdge(
+                        tChildren.indexOf(sel_1q.children(le)),
+                        tChildren.indexOf(sel_1q.children(re)),
+                        joinType)
+                    case (None, Some(rr)) =>
+                      JoinEdge(
+                        tChildren.indexOf(sel_1q.children(le)),
+                        0,
+                        joinType)
+                    case _ =>
+                      null.asInstanceOf[JoinEdge]
+                  }
+              }
+              val tPredicateList = sel_1q.predicateList.filter { p =>
+                !sel_1a.predicateList.exists(_.semanticEquals(p)) }
+              val wip = sel_1q.copy(
+                predicateList = tPredicateList,
+                children = tChildren,
+                joinEdges = tJoinEdges.filter(_ != null),
+                aliasMap = tAliasMap.toMap)
+              val subsumerName = wip.aliasMap.get(0)
+              val done = factorOutSubsumer(wip, sel_1a, subsumerName)
+              Seq(done)
+            }
+          } else Nil
+        } else Nil
+
+      case (
+        sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _),
+        sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _), None)
+        if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
+           sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+        val isPredicateRmE = sel_3a.predicateList.isEmpty ||
+                             sel_3a.predicateList.forall(expr =>
+                               sel_3q.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
+                              sel_3q.predicateList.forall(expr =>
+                                sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+                                isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+        val isOutputEdR = sel_3q.outputList.forall(expr =>
+          isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+        val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
+
+        if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
+          val isPredicateEmR = sel_3q.predicateList.isEmpty ||
+                               sel_3q.predicateList.forall(expr =>
+                                 sel_3a.predicateList.exists(_.semanticEquals(expr)))
+          val isOutputRmE = sel_3a.outputList.forall(expr =>
+            isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
+          val isOutputEmR = sel_3q.outputList.forall(expr =>
+            isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+
+          if (isPredicateEmR && isOutputEmR && isOutputRmE) {
+            Seq(sel_3a)
+          } else if (isPredicateEmR && isOutputEmR) {
+            // no compensation needed
+            val sel_3q_exp = sel_3q.transformExpressions({
+              case a: Alias => sel_3a.outputList
+                .find { a1 =>
+                  a1.isInstanceOf[Alias] &&
+                  a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+                }.map(_.toAttribute).get
+            })
+            val wip = sel_3q_exp.copy(
+              children = Seq(sel_3a),
+              aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+            val subsumerName = wip.aliasMap.get(0)
+            val done = factorOutSubsumer(wip, sel_3a, subsumerName)
+            Seq(done)
+          } else {
+            Nil
+          }
+        } else Nil
+
+      case _ => Nil
+    }
+  }
+}
+
+object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    (subsumer, subsumee, compensation) match {
+      case (
+        gb_2a @ modular.GroupBy(_, _, _, _, _, _, _),
+        gb_2q @ modular.GroupBy(_, _, _, _, _, _, _),
+        None) =>
+        val isGroupingEmR = gb_2q.predicateList.forall(expr =>
+          gb_2a.predicateList.exists(_.semanticEquals(expr)))
+        val isGroupingRmE = gb_2a.predicateList.forall(expr =>
+          gb_2q.predicateList.exists(_.semanticEquals(expr)))
+        if (isGroupingEmR && isGroupingRmE) {
+          val isOutputEmR = gb_2q.outputList.forall {
+            case a @ Alias(_, _) => gb_2a.outputList.exists(a1 => a1.isInstanceOf[Alias] &&
+                                                                  a1.asInstanceOf[Alias].child
+                                                                    .semanticEquals(a.child))
+            case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+          }
+
+          if (isOutputEmR) {
+            Seq(gb_2a)
+          } else {
+            Nil
+          }
+        } else {
+          val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
+            (a.toAttribute, a)})
+          if (isGroupingEmR) {
+            Utils.tryMatch(
+              gb_2a, gb_2q, aliasMap).flatMap {
+              case g: GroupBy =>
+                Some(g.copy(child = g.child.withNewChildren(
+                  g.child.children.map {
+                    case modular.Select(_, _, _, _, _, _, _, _, _) => gb_2a;
+                    case other => other
+                  })));
+              case _ => None}.map(Seq(_)).getOrElse(Nil)
+          } else {
+            Nil
+          }
+        }
+
+      case _ => Nil
+    }
+  }
+}
+
+object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]) = {
+    if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
+      if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR)) true
+      else false
+    } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None"))
+      .asInstanceOf[Select].predicateList.contains(exprE)) {
+      if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE))) true
+      else false
+    } else {
+      false
+    }
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val aggInputEinR = subsumee.expressions
+      .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
+        .subsetOf(subsumer.outputSet)
+      }.forall(identity)
+    val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
+      .exists(_.contains(modular.GroupBy))
+
+    (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
+      case (
+        gb_2a @ modular.GroupBy(_, _, _, _, _, _, _),
+        gb_2q @ modular.GroupBy(_, _, _, _, _, _, _),
+        Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _)),
+        true,
+        true)
+        if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
+
+        val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
+        val isGroupingEdR = gb_2q.predicateList.forall(expr =>
+          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+        val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains)
+        val canPullup = sel_1c1.predicateList.forall(expr =>
+          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+        val isAggEmR = gb_2q.outputList.collect {
+          case agg: aggregate.AggregateExpression =>
+            gb_2a.outputList.exists(_.semanticEquals(agg))
+        }.forall(identity)
+
+        if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) {
+          // pull up
+          val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
+          val sel_2c1 = sel_1c1.copy(
+            outputList = pullupOutputList,
+            inputList = pullupOutputList,
+            children = sel_1c1.children.map {
+              case s: Select => gb_2a
+              case other => other })
+
+          if (needRegrouping && rejoinOutputList.isEmpty) {
+            val aliasMap = AttributeMap(gb_2a.outputList.collect {
+              case a: Alias => (a.toAttribute, a) })
+            Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
+              case g: GroupBy => Some(g.copy(child = sel_2c1));
+              case _ => None
+            }.map { wip =>
+              factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap.get(0))
+            }.map(Seq(_))
+             .getOrElse(Nil)
+          }
+          // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
+          // via catalog service
+          else if (!needRegrouping && isAggEmR) {
+            Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap.get(0)))
+          } else Nil
+        } else Nil
+
+      case _ => Nil
+    }
+  }
+}
+
+object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet
+
+    (subsumer, subsumee, groupbys.nonEmpty) match {
+      case (
+        modular.Select(_, _, _, _, _, _, _, _, _),
+        modular.Select(_, _, _, _, _, _, _, _, _),
+        true) =>
+        // TODO: implement me
+        Nil
+
+      case _ => Nil
+    }
+  }
+}
+
+
+object SelectSelectSelectChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val compensationSelectOnly =
+      !compensation
+        .map { _.collect { case n => n.getClass } }
+        .exists(_.contains(modular.GroupBy))
+
+    (subsumer, subsumee, compensationSelectOnly) match {
+      case (
+        modular.Select(_, _, _, _, _, _, _, _, _),
+        modular.Select(_, _, _, _, _, _, _, _, _),
+        true) =>
+        // TODO: implement me
+        Nil
+      case _ => Nil
+    }
+  }
+}
+
+object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]) = {
+    Utils.isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan])
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
+      case (
+        sel_3a@modular.Select(
+        _, _, Nil, _, _,
+        Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+        sel_3q@modular.Select(
+        _, _, _, _, _,
+        Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+        Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _)),
+        rchild :: Nil,
+        echild :: Nil) =>
+        val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
+        val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+
+        val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
+        val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
+        val rejoinOutputList = rejoin.flatMap(_.output)
+
+        val isPredicateRmE = sel_3a.predicateList.forall(expr =>
+          sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
+          gb_2c.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_3q.predicateList
+          .forall(expr =>
+            sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+        val isOutputEdR = sel_3q.outputList
+          .forall(expr =>
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+
+        val canSELPullup = gb_2c.child.isInstanceOf[Select] &&
+                           gb_2c.child.asInstanceOf[Select].predicateList
+                             .forall(expr =>
+                               isDerivable(
+                                 expr,
+                                 sel_3a.outputList ++ rejoinOutputList,
+                                 sel_3q,
+                                 sel_3a,
+                                 compensation))
+        val canGBPullup = gb_2c.predicateList
+          .forall(expr =>
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+
+        if (extrajoin.isEmpty && isPredicateRmE &&
+            isPredicateEmdR &&
+            isOutputEdR &&
+            canSELPullup &&
+            canGBPullup) {
+          gb_2c.child match {
+            case s: Select =>
+              val sel_3c1 = s.withNewChildren(
+                s.children.map {
+                  case gb: GroupBy => sel_3a.setSkip()
+                  case other => other })
+              val gb_3c2 = gb_2c.copy(child = sel_3c1)
+
+              val aliasMap_exp = AttributeMap(
+                gb_2c.outputList.collect {
+                  case a: Alias => (a.toAttribute, a) })
+              val sel_3q_exp = sel_3q.transformExpressions({
+                case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
+              })
+
+              val mappings = sel_3q_exp.outputList zip gb_2c.outputList
+
+              val oList = for ((o1, o2) <- mappings) yield {
+                if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+              }
+
+              val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
+              val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap.get(0)))
+              sel_3c3.map(Seq(_)).getOrElse(Nil)
+
+            case _ => Nil
+          }
+        } else {
+          Nil
+        }
+
+      case _ => Nil
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
new file mode 100644
index 0000000..2a4da27
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+// TODO: implement this to modularize DefaultMatchingFunctions
+object MatchConditions {
+}
+
+class MatchConditions(flags: Long) {
+  def hasFlag(flag: Long): Boolean = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
new file mode 100644
index 0000000..2c5d8f4
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
+
+  def apply(
+      subsumer: MatchingPlan,
+      subsumee: MatchingPlan,
+      compensation: Option[MatchingPlan],
+      rewrite: QueryRewrite): Seq[MatchingPlan]
+
+}
+
+abstract class MatchMaker[MatchingPlan <: TreeNode[MatchingPlan]] {
+
+  /** Define a sequence of rules, to be overridden by the implementation. */
+  protected val patterns: Seq[MatchPattern[MatchingPlan]]
+
+  def execute(
+      subsumer: MatchingPlan,
+      subsumee: MatchingPlan,
+      compensation: Option[MatchingPlan],
+      rewrite: QueryRewrite): Iterator[MatchingPlan] = {
+    val iter = patterns.view.flatMap(_ (subsumer, subsumee, compensation, rewrite)).toIterator
+    iter
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
new file mode 100644
index 0000000..2a244b9
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet}
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, Select}
+
+private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MQOSession) {
+
+  def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+    val replaced = plan.transformAllExpressions {
+      case s: ModularSubquery =>
+        if (s.children.isEmpty) {
+          ScalarModularSubquery(
+            rewriteWithSummaryDatasetsCore(s.plan, rewrite), s.children, s.exprId)
+        }
+        else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported")
+      case o => o
+    }
+    rewriteWithSummaryDatasetsCore(replaced, rewrite)
+  }
+
+  def rewriteWithSummaryDatasetsCore(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+    plan transformDown {
+      case currentFragment =>
+        if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment
+        else {
+          val compensation =
+            (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
+                   subsumer <- session.sessionState.modularizer.modularize(
+                     session.sessionState.optimizer.execute(dataset.plan)).map(_.harmonized)
+                   subsumee <- unifySubsumee(currentFragment)
+                   comp <- subsume(unifySubsumer2(unifySubsumer1(subsumer, subsumee), subsumee),
+                     subsumee, rewrite)
+                 } yield comp).headOption
+          compensation.map(_.setRewritten).getOrElse(currentFragment)
+        }
+    }
+  }
+
+  def subsume(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      rewrite: QueryRewrite): Option[ModularPlan] = {
+    if (subsumer.getClass == subsumee.getClass) {
+      (subsumer.children, subsumee.children) match {
+        case (Nil, Nil) => None
+        case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) &&
+                       e.forall(_.isInstanceOf[modular.LeafNode]) =>
+          val iter = session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
+          if (iter.hasNext) Some(iter.next)
+          else None
+
+        case (rchild :: Nil, echild :: Nil) =>
+          val compensation = subsume(rchild, echild, rewrite)
+          val oiter = compensation.map {
+            case comp if comp.eq(rchild) =>
+              session.sessionState.matcher.execute(subsumer, subsumee, None, rewrite)
+            case _ =>
+              session.sessionState.matcher.execute(subsumer, subsumee, compensation, rewrite)
+          }
+          oiter.flatMap { case iter if iter.hasNext => Some(iter.next)
+                          case _ => None }
+
+        case _ => None
+      }
+    } else None
+  }
+
+  // add Select operator as placeholder on top of subsumee to facilitate matching
+  def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
+    subsumee match {
+      case gb @ modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _) =>
+        Some(
+          Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags,
+            gb.flagSpec, Seq.empty))
+      case other => Some(other)
+    }
+  }
+
+  // add Select operator as placeholder on top of subsumer to facilitate matching
+  def unifySubsumer1(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+    (subsumer, subsumee) match {
+      case (r @
+        modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _),
+        modular.Select(_, _, _, _, _,
+          Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+          _, _, _)
+        ) =>
+        modular.Select(
+          r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags,
+          r.flagSpec, Seq.empty).setSkip()
+      case _ => subsumer.setSkip()
+    }
+  }
+
+  def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+    val rtables = subsumer.collect { case n: modular.LeafNode => n }
+    val etables = subsumee.collect { case n: modular.LeafNode => n }
+    val pairs = for {
+      rtable <- rtables
+      etable <- etables
+      if (rtable == etable)
+    } yield (rtable, etable)
+
+    pairs.foldLeft(subsumer) {
+      case (curSubsumer, pair) =>
+        val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 }
+        val attributeSet = AttributeSet(pair._1.output)
+        val rewrites = AttributeMap(pair._1.output.zip(pair._2.output))
+        nxtSubsumer.transformUp {
+          case p => p.transformExpressions {
+            case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier)
+          }
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
new file mode 100644
index 0000000..5cb5cc6
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.MQOSession
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * The primary workflow for rewriting relational queries using Spark libraries.
+ */
+class QueryRewrite private (
+    mqoSession: MQOSession,
+    logical: LogicalPlan,
+    nextSubqueryId: AtomicLong) {
+  self =>
+
+  def this(mqoSession: MQOSession, logical: LogicalPlan) =
+    this(mqoSession, logical, new AtomicLong(0))
+
+  def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}"
+
+  lazy val optimizedPlan: LogicalPlan =
+    mqoSession.sessionState.optimizer.execute(logical)
+
+  lazy val modularPlan: ModularPlan =
+    mqoSession.sessionState.modularizer.modularize(optimizedPlan).next().harmonized
+
+  lazy val withSummaryData: ModularPlan =
+    mqoSession.sessionState.navigator.rewriteWithSummaryDatasets(modularPlan, self)
+
+  lazy val toCompactSQL: String = withSummaryData.asCompactSQL
+
+  lazy val toOneLineSQL: String = withSummaryData.asOneLineSQL
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
new file mode 100644
index 0000000..812d09d
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.{ModularPlan, SimpleModularizer}
+import org.apache.carbondata.mv.plans.util.{BirdcageOptimizer, Signature}
+
+/** Holds a summary logical plan */
+private[mv] case class SummaryDataset(signature: Option[Signature], plan: LogicalPlan)
+
+private[mv] class SummaryDatasetCatalog(saprkContext: SparkContext) {
+
+  @transient
+  private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset]
+
+  @transient
+  private val registerLock = new ReentrantReadWriteLock
+
+  private val optimizer = BirdcageOptimizer
+
+  private val modularizer = SimpleModularizer
+
+  //  /** Returns true if the table is currently registered in-catalog. */
+  //  def isRegistered(tableName: String): Boolean =
+  //    lookupSummaryDataset(sqlContext.table(tableName)).nonEmpty
+  //
+  //  /** Registers the specified table in-memory catalog. */
+  //  def registerTable(tableName: String): Unit =
+  //    registerSummaryDataset(sqlContext.table(tableName), Some(tableName))
+  //
+  //  /** Removes the specified table from the in-memory catalog. */
+  //  def unregisterTable(tableName: String): Unit =
+  //   unregisterSummaryDataset(sqlContext.table(tableName))
+  //
+  //  override def unregisterAllTables(): Unit = {
+  //    clearSummaryDatasetCatalog()
+  //  }
+
+  /** Acquires a read lock on the catalog for the duration of `f`. */
+  private def readLock[A](f: => A): A = {
+    val lock = registerLock.readLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Acquires a write lock on the catalog for the duration of `f`. */
+  private def writeLock[A](f: => A): A = {
+    val lock = registerLock.writeLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Clears all summary tables. */
+  private[mv] def clearSummaryDatasetCatalog(): Unit = {
+    writeLock {
+      summaryDatasets.clear()
+    }
+  }
+
+  /** Checks if the catalog is empty. */
+  private[mv] def isEmpty: Boolean = {
+    readLock {
+      summaryDatasets.isEmpty
+    }
+  }
+
+  /**
+   * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike
+   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+   * the in-memory columnar representation of the underlying table is expensive.
+   */
+  private[mv] def registerSummaryDataset(
+      query: DataFrame,
+      tableName: Option[String] = None): Unit = {
+    writeLock {
+      val planToRegister = query.queryExecution.analyzed
+      if (lookupSummaryDataset(planToRegister).nonEmpty) {
+        sys.error(s"Asked to register already registered.")
+      } else {
+        val modularPlan = modularizer.modularize(optimizer.execute(planToRegister)).next()
+          .harmonized
+        val signature = modularPlan.signature
+        summaryDatasets += SummaryDataset(signature, planToRegister)
+      }
+    }
+  }
+
+  /** Removes the given [[DataFrame]] from the catalog */
+  private[mv] def unregisterSummaryDataset(query: DataFrame): Unit = {
+    writeLock {
+      val planToRegister = query.queryExecution.analyzed
+      val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan))
+      require(dataIndex >= 0, s"Table $query is not registered.")
+      summaryDatasets.remove(dataIndex)
+    }
+  }
+
+  /**
+   * Tries to remove the data set for the given [[DataFrame]] from the catalog if it's registered
+   */
+  private[mv] def tryUnregisterSummaryDataset(
+      query: DataFrame,
+      blocking: Boolean = true): Boolean = {
+    writeLock {
+      val planToRegister = query.queryExecution.analyzed
+      val dataIndex = summaryDatasets.indexWhere(sd => planToRegister.sameResult(sd.plan))
+      val found = dataIndex >= 0
+      if (found) {
+        summaryDatasets.remove(dataIndex)
+      }
+      found
+    }
+  }
+
+  /** Optionally returns registered data set for the given [[DataFrame]] */
+  private[mv] def lookupSummaryDataset(query: DataFrame): Option[SummaryDataset] = {
+    readLock {
+      lookupSummaryDataset(query.queryExecution.analyzed)
+    }
+  }
+
+  /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+  private[mv] def lookupSummaryDataset(plan: LogicalPlan): Option[SummaryDataset] = {
+    readLock {
+      summaryDatasets.find(sd => plan.sameResult(sd.plan))
+    }
+  }
+
+  /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+  private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = {
+    readLock {
+      val sig = plan.signature
+      val feasible = summaryDatasets.filter { x =>
+        (x.signature, sig) match {
+          case (Some(sig1), Some(sig2)) =>
+            if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+              true
+            } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+              true
+            } else {
+              false
+            }
+
+          case _ => false
+        }
+      }
+      // heuristics: more tables involved in a summary data set => higher query reduction factor
+      feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
new file mode 100644
index 0000000..a04c85a
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Utils.scala
@@ -0,0 +1,358 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * Utility functions used by mqo matcher to convert our plan to new aggregation code path
+ */
+private[rewrite] object Utils extends PredicateHelper {
+
+  // use for match qb_2a, qb_2q and sel_3a, sel_3q
+  private def doMatch(
+      operator_a: modular.Matchable,
+      operator_q: modular.Matchable,
+      alias_m: AttributeMap[Alias]): Option[modular.Matchable] = {
+    var matchable = true
+    val matched = operator_q.transformExpressions {
+      case cnt_q@AggregateExpression(Count(exprs_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Count] =>
+            // case for groupby
+            val cnt_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
+              false
+            } else {
+              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+                .forall(p => p._1.semanticEquals(p._2))
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Count] =>
+            val cnt_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val exprs_a = cnt_a.aggregateFunction.asInstanceOf[Count].children
+            if (cnt_a.isDistinct != cnt_q.isDistinct || exprs_q.length != exprs_a.length) {
+              false
+            } else {
+              exprs_a.sortBy(_.hashCode()).zip(exprs_q.sortBy(_.hashCode()))
+                .forall(p => p._1.semanticEquals(p._2))
+            }
+
+          case _ => false
+        }.map { cnt => AggregateExpression(
+            Sum(cnt.toAttribute),
+            cnt_q.mode,
+            isDistinct = false,
+            cnt_q.resultId)
+        }.getOrElse { matchable = false; cnt_q }
+
+      case sum_q@AggregateExpression(Sum(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Sum] =>
+            val sum_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+            if (sum_a.isDistinct != sum_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Sum] =>
+            val sum_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = sum_a.aggregateFunction.asInstanceOf[Sum].child
+            if (sum_a.isDistinct != sum_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case _ => false
+        }.map { sum => AggregateExpression(
+            Sum(sum.toAttribute),
+            sum_q.mode,
+            isDistinct = false,
+            sum_q.resultId)
+        }.getOrElse { matchable = false; sum_q }
+
+      case max_q@AggregateExpression(Max(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Max] =>
+            val max_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+            if (max_a.isDistinct != max_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Max] =>
+            val max_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = max_a.aggregateFunction.asInstanceOf[Max].child
+            if (max_a.isDistinct != max_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+
+          case _ => false
+        }.map { max => AggregateExpression(
+            Max(max.toAttribute),
+            max_q.mode,
+            isDistinct = false,
+            max_q.resultId)
+        }.getOrElse { matchable = false; max_q }
+
+      case min_q@AggregateExpression(Min(expr_q), _, false, _) =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               alias_m(alias.toAttribute).child.isInstanceOf[AggregateExpression] &&
+                               alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+                                 .aggregateFunction.isInstanceOf[Min] => {
+            val min_a = alias_m(alias.toAttribute).child.asInstanceOf[AggregateExpression]
+            val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+            if (min_a.isDistinct != min_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+          }
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.isInstanceOf[AggregateExpression] &&
+                                  alias_m(attr).child.asInstanceOf[AggregateExpression]
+                                    .aggregateFunction.isInstanceOf[Min] => {
+            val min_a = alias_m(attr).child.asInstanceOf[AggregateExpression]
+            val expr_a = min_a.aggregateFunction.asInstanceOf[Max].child
+            if (min_a.isDistinct != min_q.isDistinct) {
+              false
+            } else {
+              expr_a.semanticEquals(expr_q)
+            }
+          }
+          case _ => false
+        }.map { min => AggregateExpression(
+            Min(min.toAttribute),
+            min_q.mode,
+            isDistinct = false,
+            min_q.resultId)
+        }.getOrElse { matchable = false; min_q }
+
+      case other: AggregateExpression =>
+        matchable = false
+        other
+
+      case expr: Expression if !expr.isInstanceOf[AggregateFunction] =>
+        operator_a.outputList.find {
+          case alias: Alias if alias_m.contains(alias.toAttribute) &&
+                               alias_m(alias.toAttribute).child.semanticEquals(expr) &&
+                               !alias_m(alias.toAttribute).child
+                                 .isInstanceOf[AggregateExpression] => true
+          case attr: Attribute if alias_m.contains(attr) &&
+                                  alias_m(attr).child.semanticEquals(expr) &&
+                                  !alias_m(attr).child.isInstanceOf[AggregateExpression] => true
+          case _ => false
+        }.map(_.toAttribute)
+         .getOrElse { expr }
+    }
+
+    if (matchable) {
+      Some(matched)
+    } else {
+      None
+    }
+  }
+
+  def tryMatch(a: modular.Matchable,
+      q: modular.Matchable,
+      m: AttributeMap[Alias]): Option[modular.Matchable] = {
+    if (a.getClass == q.getClass) {
+      doMatch(a, q, m)
+    } else {
+      None
+    }
+  }
+
+  /**
+   * (Subsumee) expression translation:
+   *
+   * The translation begins by creating a copy of the whole expression (step 1).  Then each input
+   * column is translated in turn.
+   * To translate an input column, we first find the child block that produces the input column
+   * and replace the input column with the
+   * associated output column expression (step 2).  The next step is to replace the translated
+   * expression with its equivalent output
+   * expression at the top of the child compensation (step 3).  Then, We recursively translate
+   * each new input column(except input
+   * columns produced by rejoin children) until we reach the bottom of the child compensation
+   * (step 4).  Finally, we find an
+   * equivalent output expression in subsumer (step 5).
+   *
+   * So given a subsumee expr, the translation follows the following path:
+   *
+   * top of subsumee --> child of subsumee --> top of compensation --> bottom of compensation -->
+   * top of subsumer
+   *
+   * To simplify this we assume in subsumer outputList of top select 1-1 corresponds to the
+   * outputList of groupby
+   * note that subsumer outputList is list of attributes and that of groupby is list of aliases
+   *
+   */
+  private def doTopSelectTranslation(exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Option[Expression] = {
+    (subsumer, subsumee, compensation) match {
+      // top selects whose children do not match exactly
+      // for simplicity, we assume outputList of subsumer is 1-1 corresponding to that of its
+      // immediately groupby child
+      case (
+        sel_3a@modular.Select(
+          _, _, _, _, _,
+          Seq(gb_2a@modular.GroupBy(
+            _, _, _, _, sel_2a@modular.Select(_, _, _, _, _, _, _, _, _), _, _)),
+          _, _, _),
+        sel_3q@modular.Select(
+          _, _, _, _, _, Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _)), _, _, _),
+        Some(gb_2c@modular.GroupBy(
+          _, _, _, _, sel_2c@modular.Select(_, _, _, _, _, _, _, _, _), _, _))
+        ) =>
+        if (sel_3q.predicateList.contains(exprE)) {
+          val expr1E = exprE.transform {
+            case attr: Attribute =>
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case alias: Alias if alias.toAttribute.semanticEquals(attr) => true;
+                  case other => false
+                  }).getOrElse { attr }
+          }
+          if (expr1E.eq(exprE)) {
+            None
+          } else {
+            Some(expr1E)
+          }
+        }
+        else if (sel_3q.outputList.contains(exprE)) {
+          exprE match {
+            case attr: Attribute => // this subexpression must in subsumee select output list
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case a if a.toAttribute.semanticEquals(attr) => true;
+                  case other => false
+                  })
+
+            case alias: Alias =>
+              gb_2c.outputList.lift(
+                gb_2q.outputList.indexWhere {
+                  case a if a.toAttribute.semanticEquals(alias.toAttribute) => true;
+                  case other => false
+                  })
+
+            case _ => None
+          }
+        } else if (sel_2c.predicateList.contains(exprE)) {
+          if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+              canEvaluate(exprE, subsumer)) {
+            Some(exprE)
+          } else {
+            None
+          }
+        } else if (gb_2c.predicateList.contains(exprE)) {
+          if (gb_2a.outputList.exists {
+                case a: Alias if a.child.semanticEquals(exprE) => true;
+                case _ => false
+              } || canEvaluate(exprE, subsumer)) {
+            Some(exprE)
+          } else {
+            None
+          }
+        } else if (sel_2a.predicateList.exists(_.semanticEquals(exprE)) ||
+                   canEvaluate(exprE, subsumer)) {
+          Some(exprE)
+        } else {
+          None
+        }
+
+      case _ => None // TODO: implement this
+    }
+  }
+
+  private def isSemanticEquivalent(translatedExpr: Expression, subsumer: ModularPlan) = {
+    subsumer match {
+      // if subsumer has where clause, even if expr can be translated into new expr based on
+      // subsumer, the two may not be semantic equivalent
+      // TODO: refine this
+      case modular.Select(
+        _, _, predicateList, _, _,
+        Seq(modular.GroupBy(_, _, _, _, _, _, _)), _, _, _)
+        if predicateList.nonEmpty => false
+      case _ => true
+    }
+  }
+
+  /**
+   * derivable = translatable + semantic equivalent
+   *
+   * The translation method described above is also the first step in deriving a subsumee
+   * expression Eexp from the subsumer's output columns.  After translating
+   * Eexp to E'exp, deriavability can be established by making sure that the subsumer
+   * computes at its output certain necessary subexpressions of E'exp (or even the entire
+   * E'exp).  The problem that arises, however, is to determine the parts of E'exp that
+   * can/should be computed by the subsumer.
+   *
+   * In general, translation causes an expression to expand by replacing individual input
+   * columns with equivalent subexpressions.  Derivation is the reverse operation, where
+   * pieces of the translated expression are collapsed as they are computed along the
+   * derivation path.
+   */
+
+  def isDerivable(exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Boolean = {
+    val exprE1 = doTopSelectTranslation(exprE, exprListR, subsumee, subsumer, compensation)
+    exprE1 match {
+      case Some(e) => isSemanticEquivalent(e, subsumer)
+      case _ => false
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
new file mode 100644
index 0000000..0ee2475
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/SelectSelectExactChildrenSuite.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.dsl.expressions._
+import org.apache.spark.sql.catalyst.plans.logical._
+import org.apache.spark.sql.test.util.PlanTest
+
+class SelectSelectExactChildrenSuite extends PlanTest {
+  
+  object Match extends DefaultMatchMaker {
+    val patterns = SelectSelectNoChildDelta :: Nil
+  }
+  
+  val testRelation1 = LocalRelation('tid.int,'fpgid.int,'flid.int,'date.timestamp,'faid.int,'price.double,'qty.int,'disc.string)
+  val testRelation2 = LocalRelation('lid.int,'city.string,'state.string,'country.string)
+  
+//  test("pro-typical lower select") {
+//    val fact = testRelation1.subquery('fact)
+//    val dim = testRelation2.subquery('dim)
+//    
+//    val lowerSTSelect =
+//      fact
+//        .select('faid,'flid,Year('date) as 'year)
+//        .analyze
+//    val lowerUQSelect =
+//      fact.join(dim)
+//          .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA")
+//          .select('faid,'flid,Year('date) as 'year, 'state)
+//          .analyze
+//          
+//    val matched = Match.execute(lowerSTSelect.model,lowerUQSelect.model,None).next 
+//    
+//    val correctAnswer = 
+//      lowerSTSelect.join(dim)
+//          .where("fact.flid".attr === "dim.lid".attr && "dim.country".attr === "USA") 
+//          .select('faid,'flid,Year('date) as 'year, 'state)
+//          .analyze.model
+//    
+//    comparePlans(matched, correctAnswer)
+//  }
+  
+//  val testSummaryDataset =
+//    s"""
+//       |SELECT faid, flid, year_proj(date) as year, count(*) as cnt
+//       |FROM Fact
+//       |GROUP BY faid, flid, year_proj(date)
+//    """.stripMargin.trim
+//      
+//  val testUserQuery = 
+//    s"""
+//       |SELECT faid, state, year_proj(date) as year, count(*) as cnt
+//       |FROM Fact
+//       |  INNER JOIN Dim 
+//       |  ON Fact.flid = Dim.lid AND Dim.country = "USA"
+//       |GROUP BY Fact.faid,Fact.state,year_proj(Fact.date)
+//       |HAVING count(*) > 2
+//    """.stripMargin.trim
+    
+  
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/02fd7873/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
new file mode 100644
index 0000000..86b9dce
--- /dev/null
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestSQLSuite.scala
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.util._
+import org.apache.spark.sql.test.util.PlanTest
+import org.scalatest.BeforeAndAfter
+
+import org.apache.carbondata.mv.MQOSession
+
+class TestSQLSuite extends PlanTest with BeforeAndAfter {
+
+  import org.apache.carbondata.mv.rewrite.matching.TestSQLBatch._
+
+  val spark = SparkSession.builder().master("local").enableHiveSupport().getOrCreate()
+  val testHive = new org.apache.spark.sql.hive.test.TestHiveContext(spark.sparkContext, false)
+  val hiveClient = testHive.sparkSession.metadataHive
+
+  test("protypical mqo rewrite test") {
+
+    hiveClient.runSqlHive(
+      s"""
+         |CREATE TABLE Fact (
+         |  `tid`     int,
+         |  `fpgid`   int,
+         |  `flid`    int,
+         |  `date`    timestamp,
+         |  `faid`    int,
+         |  `price`   double,
+         |  `qty`     int,
+         |  `disc`    string
+         |)
+         |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+         |STORED AS TEXTFILE
+        """.stripMargin.trim
+    )
+
+    hiveClient.runSqlHive(
+      s"""
+         |CREATE TABLE Dim (
+         |  `lid`     int,
+         |  `city`    string,
+         |  `state`   string,
+         |  `country` string
+         |)
+         |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+         |STORED AS TEXTFILE
+        """.stripMargin.trim
+    )
+
+    hiveClient.runSqlHive(
+      s"""
+         |CREATE TABLE Item (
+         |  `i_item_id`     int,
+         |  `i_item_sk`     int
+         |)
+         |ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+         |STORED AS TEXTFILE
+        """.stripMargin.trim
+    )
+
+    val dest = "case_10"
+
+    sampleTestCases.foreach { testcase =>
+      if (testcase._1 == dest) {
+        val mqoSession = new MQOSession(testHive.sparkSession)
+        val summary = testHive.sparkSession.sql(testcase._2)
+        mqoSession.sharedState.registerSummaryDataset(summary)
+        val rewrittenSQL = mqoSession.rewrite(testcase._3).toCompactSQL.trim
+
+        if (!rewrittenSQL.equals(testcase._4)) {
+          println(
+            s"""
+               |=== FAIL: SQLs do not match ===
+               |${sideBySide(rewrittenSQL, testcase._4).mkString("\n")}
+              """.stripMargin)
+        }
+      }
+
+    }
+  }
+  testHive.sparkSession.cloneSession()
+}
\ No newline at end of file