You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/05/22 16:44:46 UTC

[04/50] [abbrv] carbondata git commit: [CARBONDATA-2475] Support Modular Core for Materialized View DataMap for query matching and rewriting

[CARBONDATA-2475] Support Modular Core for Materialized View DataMap for query matching and rewriting

Support Modular Core for Materialized View DataMap

This closes #2302


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bf73e9fe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bf73e9fe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bf73e9fe

Branch: refs/heads/branch-1.4
Commit: bf73e9fe77523e23be46e7597e2c990e855401e5
Parents: d14c403
Author: ravipesala <ra...@gmail.com>
Authored: Sat May 12 22:49:19 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Sun May 13 17:08:19 2018 +0800

----------------------------------------------------------------------
 datamap/mv/core/pom.xml                         |  169 ++
 .../carbondata/mv/datamap/MVAnalyzerRule.scala  |  105 +
 .../mv/datamap/MVDataMapProvider.scala          |  125 +
 .../apache/carbondata/mv/datamap/MVHelper.scala |  377 +++
 .../apache/carbondata/mv/datamap/MVState.scala  |   55 +
 .../mv/rewrite/DefaultMatchMaker.scala          |  647 +++++
 .../carbondata/mv/rewrite/MatchConditions.scala |   28 +
 .../carbondata/mv/rewrite/MatchMaker.scala      |   47 +
 .../carbondata/mv/rewrite/Navigator.scala       |  196 ++
 .../carbondata/mv/rewrite/QueryRewrite.scala    |   53 +
 .../mv/rewrite/SummaryDatasetCatalog.scala      |  168 ++
 .../apache/carbondata/mv/rewrite/Utils.scala    |  358 +++
 .../mv/rewrite/MVCreateTestCase.scala           |  676 +++++
 .../mv/rewrite/MVSampleTestCase.scala           |  167 ++
 .../carbondata/mv/rewrite/MVTPCDSTestCase.scala |  146 +
 .../carbondata/mv/rewrite/MVTpchTestCase.scala  |  247 ++
 .../SelectSelectExactChildrenSuite.scala        |   76 +
 .../carbondata/mv/rewrite/Tpcds_1_4_Suite.scala |   80 +
 .../mv/rewrite/matching/TestSQLBatch.scala      |  214 ++
 .../rewrite/matching/TestTPCDS_1_4_Batch.scala  | 2496 ++++++++++++++++++
 20 files changed, 6430 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/core/pom.xml b/datamap/mv/core/pom.xml
new file mode 100644
index 0000000..99a8e22
--- /dev/null
+++ b/datamap/mv/core/pom.xml
@@ -0,0 +1,169 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-mv-core</artifactId>
+  <name>Apache CarbonData :: Materialized View Core</name>
+
+  <properties>
+    <dev.path>${basedir}/../../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-mv-plan</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <skip>false</skip>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+          <testFailureIgnore>false</testFailureIgnore>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.ning.maven.plugins</groupId>
+        <artifactId>maven-duplicate-finder-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <testFailureIgnore>false</testFailureIgnore>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+  <profiles>
+  <profile>
+    <id>sdvtest</id>
+    <properties>
+      <maven.test.skip>true</maven.test.skip>
+    </properties>
+  </profile>
+  </profiles>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
new file mode 100644
index 0000000..4e93f15
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVAnalyzerRule.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.analysis.{UnresolvedAlias, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.expressions.{Alias, ScalaUDF}
+import org.apache.spark.sql.catalyst.plans.logical.{Command, DeserializeToObject, LogicalPlan}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.datamap.DataMapManager
+import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
+
+/**
+ * Analyzer rule to rewrite the query for MV datamap
+ *
+ * @param sparkSession
+ */
+class MVAnalyzerRule(sparkSession: SparkSession) extends Rule[LogicalPlan] {
+
+  // TODO Find way better way to get the provider.
+  private val dataMapProvider =
+    DataMapManager.get().getDataMapProvider(null,
+      new DataMapSchema("", DataMapClassProvider.MV.getShortName), sparkSession)
+
+  private val LOGGER = LogServiceFactory.getLogService(classOf[MVAnalyzerRule].getName)
+
+  override def apply(plan: LogicalPlan): LogicalPlan = {
+    var needAnalysis = true
+    plan.transformAllExpressions {
+      // first check if any preAgg scala function is applied it is present is in plan
+      // then call is from create preaggregate table class so no need to transform the query plan
+      // TODO Add different UDF name
+      case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") =>
+        needAnalysis = false
+        al
+      // in case of query if any unresolve alias is present then wait for plan to be resolved
+      // return the same plan as we can tranform the plan only when everything is resolved
+      case unresolveAlias@UnresolvedAlias(_, _) =>
+        needAnalysis = false
+        unresolveAlias
+      case attr@UnresolvedAttribute(_) =>
+        needAnalysis = false
+        attr
+    }
+    val catalog = DataMapStoreManager.getInstance().getDataMapCatalog(dataMapProvider,
+      DataMapClassProvider.MV.getShortName).asInstanceOf[SummaryDatasetCatalog]
+    if (needAnalysis && catalog != null && isValidPlan(plan, catalog)) {
+      val modularPlan = catalog.mVState.rewritePlan(plan).withSummaryData
+      if (modularPlan.find (_.rewritten).isDefined) {
+        val compactSQL = modularPlan.asCompactSQL
+        LOGGER.audit(s"\n$compactSQL\n")
+        val analyzed = sparkSession.sql(compactSQL).queryExecution.analyzed
+        analyzed
+      } else {
+        plan
+      }
+    } else {
+      plan
+    }
+  }
+
+  def isValidPlan(plan: LogicalPlan, catalog: SummaryDatasetCatalog): Boolean = {
+    !plan.isInstanceOf[Command] && !isDataMapExists(plan, catalog.listAllSchema()) &&
+    !plan.isInstanceOf[DeserializeToObject]
+  }
+  /**
+   * Check whether datamap table already updated in the query.
+   *
+   * @param plan
+   * @param mvs
+   * @return
+   */
+  def isDataMapExists(plan: LogicalPlan, mvs: Array[SummaryDataset]): Boolean = {
+    val catalogs = plan collect {
+      case l: LogicalRelation => l.catalogTable
+    }
+    catalogs.isEmpty || catalogs.exists { c =>
+      mvs.exists { mv =>
+        val identifier = mv.dataMapSchema.getRelationIdentifier
+        identifier.getTableName.equals(c.get.identifier.table) &&
+        identifier.getDatabaseName.equals(c.get.database)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
new file mode 100644
index 0000000..2aba23e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVDataMapProvider.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import java.io.IOException
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.SubqueryAlias
+import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
+import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
+import org.apache.spark.sql.execution.datasources.FindDataSourceTable
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+import org.apache.spark.sql.util.SparkSQLUtil
+
+import org.apache.carbondata.common.annotations.InterfaceAudience
+import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException
+import org.apache.carbondata.core.datamap.{DataMapCatalog, DataMapProvider, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory}
+import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
+import org.apache.carbondata.mv.rewrite.{SummaryDataset, SummaryDatasetCatalog}
+
+@InterfaceAudience.Internal
+class MVDataMapProvider(
+    mainTable: CarbonTable,
+    sparkSession: SparkSession,
+    dataMapSchema: DataMapSchema)
+  extends DataMapProvider(mainTable, dataMapSchema) {
+  protected var dropTableCommand: CarbonDropTableCommand = null
+
+  @throws[MalformedDataMapCommandException]
+  @throws[IOException]
+  override def initMeta(ctasSqlStatement: String): Unit = {
+    if (ctasSqlStatement == null) {
+      throw new MalformedDataMapCommandException(
+        "select statement is mandatory")
+    }
+    MVHelper.createMVDataMap(sparkSession, dataMapSchema, ctasSqlStatement, true)
+    DataMapStoreManager.getInstance.registerDataMapCatalog(this, dataMapSchema)
+  }
+
+  override def initData(): Unit = {
+  }
+
+  @throws[IOException]
+  override def cleanMeta(): Unit = {
+    dropTableCommand = new CarbonDropTableCommand(true,
+      new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
+      dataMapSchema.getRelationIdentifier.getTableName,
+      true)
+    dropTableCommand.processMetadata(sparkSession)
+    DataMapStoreManager.getInstance.unRegisterDataMapCatalog(dataMapSchema)
+    DataMapStoreManager.getInstance().dropDataMapSchema(dataMapSchema.getDataMapName)
+  }
+
+  override def cleanData(): Unit = {
+    if (dropTableCommand != null) {
+      dropTableCommand.processData(sparkSession)
+    }
+  }
+
+  @throws[IOException]
+  override def rebuild(): Unit = {
+    val ctasQuery = dataMapSchema.getCtasQuery
+    if (ctasQuery != null) {
+      val identifier = dataMapSchema.getRelationIdentifier
+      val logicalPlan =
+        new FindDataSourceTable(sparkSession).apply(
+          sparkSession.sessionState.catalog.lookupRelation(
+          TableIdentifier(identifier.getTableName,
+            Some(identifier.getDatabaseName)))) match {
+          case s: SubqueryAlias => s.child
+          case other => other
+        }
+      val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(ctasQuery)
+      val queryPlan = SparkSQLUtil.execute(
+        sparkSession.sql(updatedQuery).queryExecution.analyzed,
+        sparkSession).drop("preAgg")
+      val header = logicalPlan.output.map(_.name).mkString(",")
+      val loadCommand = CarbonLoadDataCommand(
+        databaseNameOp = Some(identifier.getDatabaseName),
+        tableName = identifier.getTableName,
+        factPathFromUser = null,
+        dimFilesPath = Seq(),
+        options = scala.collection.immutable.Map("fileheader" -> header),
+        isOverwriteTable = true,
+        inputSqlString = null,
+        dataFrame = Some(queryPlan),
+        updateModel = None,
+        tableInfoOp = None,
+        internalOptions = Map.empty,
+        partition = Map.empty)
+
+      SparkSQLUtil.execute(loadCommand, sparkSession)
+    }
+  }
+
+  @throws[IOException]
+  override def incrementalBuild(
+      segmentIds: Array[String]): Unit = {
+    throw new UnsupportedOperationException
+  }
+
+  override def createDataMapCatalog : DataMapCatalog[SummaryDataset] =
+    new SummaryDatasetCatalog(sparkSession)
+
+  override def getDataMapFactory: DataMapFactory[_ <: DataMap[_ <: Blocklet]] = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
new file mode 100644
index 0000000..0f9362f
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVHelper.scala
@@ -0,0 +1,377 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.mv.datamap
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, AttributeSet, Expression, NamedExpression, ScalaUDF}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project}
+import org.apache.spark.sql.execution.command.{Field, TableModel, TableNewProcessor}
+import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, DataMapSchemaStorageProvider, RelationIdentifier}
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular.{GroupBy, Matchable, ModularPlan, Select}
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, QueryRewrite}
+import org.apache.carbondata.spark.util.CommonUtil
+
+/**
+ * Utility for MV datamap operations.
+ */
+object MVHelper {
+
+  def createMVDataMap(sparkSession: SparkSession,
+      dataMapSchema: DataMapSchema,
+      queryString: String,
+      ifNotExistsSet: Boolean = false): Unit = {
+    val dmProperties = dataMapSchema.getProperties.asScala
+    val updatedQuery = new CarbonSpark2SqlParser().addPreAggFunction(queryString)
+    val logicalPlan = sparkSession.sql(updatedQuery).drop("preAgg").queryExecution.analyzed
+    val fields = logicalPlan.output.map { attr =>
+      val name = updateColumnName(attr)
+      val rawSchema = '`' + name + '`' + ' ' + attr.dataType.typeName
+      if (attr.dataType.typeName.startsWith("decimal")) {
+        val (precision, scale) = CommonUtil.getScaleAndPrecision(attr.dataType.catalogString)
+        Field(column = name,
+          dataType = Some(attr.dataType.typeName),
+          name = Some(name),
+          children = None,
+          precision = precision,
+          scale = scale,
+          rawSchema = rawSchema)
+      } else {
+        Field(column = name,
+          dataType = Some(attr.dataType.typeName),
+          name = Some(name),
+          children = None,
+          rawSchema = rawSchema)
+      }
+    }
+    val tableProperties = mutable.Map[String, String]()
+    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
+
+    val selectTables = getTables(logicalPlan)
+
+    // TODO inherit the table properties like sort order, sort scope and block size from parent
+    // tables to mv datamap table
+    // TODO Use a proper DB
+    val tableIdentifier =
+    TableIdentifier(dataMapSchema.getDataMapName + "_table",
+      selectTables.head.identifier.database)
+    // prepare table model of the collected tokens
+    val tableModel: TableModel = new CarbonSpark2SqlParser().prepareTableModel(
+      ifNotExistPresent = ifNotExistsSet,
+      new CarbonSpark2SqlParser().convertDbNameToLowerCase(tableIdentifier.database),
+      tableIdentifier.table.toLowerCase,
+      fields,
+      Seq(),
+      tableProperties,
+      None,
+      isAlterFlow = false,
+      None)
+
+    val tablePath = if (dmProperties.contains("path")) {
+      dmProperties("path")
+    } else {
+      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+    }
+    CarbonCreateTableCommand(TableNewProcessor(tableModel),
+      tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
+
+    dataMapSchema.setCtasQuery(queryString)
+    dataMapSchema
+      .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
+        tableIdentifier.table,
+        ""))
+
+    val parentIdents = selectTables.map { table =>
+      new RelationIdentifier(table.database, table.identifier.table, "")
+    }
+    dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
+    DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
+  }
+
+  def updateColumnName(attr: Attribute): String = {
+    val name = attr.name.replace("(", "_").replace(")", "").replace(" ", "_").replace("=", "")
+    attr.qualifier.map(qualifier => qualifier + "_" + name).getOrElse(name)
+  }
+
+  def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
+    logicalPlan.collect {
+      case l: LogicalRelation => l.catalogTable.get
+    }
+  }
+
+  def dropDummFuc(plan: LogicalPlan): LogicalPlan = {
+    plan transform {
+      case p@Project(exps, child) =>
+        Project(dropDummyExp(exps), child)
+      case Aggregate(grp, aggExp, child) =>
+        Aggregate(
+          grp,
+          dropDummyExp(aggExp),
+          child)
+    }
+  }
+
+  private def dropDummyExp(exps: Seq[NamedExpression]) = {
+    exps.map {
+      case al@Alias(udf: ScalaUDF, name) if name.equalsIgnoreCase("preAgg") => None
+      case attr: AttributeReference if attr.name.equalsIgnoreCase("preAgg") => None
+      case other => Some(other)
+    }.filter(_.isDefined).map(_.get)
+  }
+
+  def getAttributeMap(subsumer: Seq[NamedExpression],
+      subsume: Seq[NamedExpression]): Map[AttributeKey, NamedExpression] = {
+    if (subsumer.length == subsume.length) {
+      subsume.zip(subsumer).flatMap { case (left, right) =>
+        var tuples = left collect {
+          case attr: AttributeReference =>
+            (AttributeKey(attr), createAttrReference(right, attr.name))
+        }
+        left match {
+          case a: Alias =>
+            tuples = Seq((AttributeKey(a.child), createAttrReference(right, a.name))) ++ tuples
+          case _ =>
+        }
+        Seq((AttributeKey(left), createAttrReference(right, left.name))) ++ tuples
+      }.toMap
+    } else {
+      throw new UnsupportedOperationException("Cannot create mapping with unequal sizes")
+    }
+  }
+
+  def createAttrReference(ref: NamedExpression, name: String): Alias = {
+    Alias(ref, name)(exprId = ref.exprId, qualifier = None)
+  }
+
+  case class AttributeKey(exp: Expression) {
+
+    override def equals(other: Any): Boolean = other match {
+      case attrKey: AttributeKey =>
+        exp.semanticEquals(attrKey.exp)
+      case _ => false
+    }
+
+    override def hashCode: Int = exp.hashCode
+
+  }
+
+  /**
+   * Updates the expressions as per the subsumer output expressions. It is needed to update the
+   * expressions as per the datamap table relation
+   *
+   * @param expressions        expressions which are needed to update
+   * @param aliasName          table alias name
+   * @return Updated expressions
+   */
+  def updateSubsumeAttrs(
+      expressions: Seq[Expression],
+      attrMap: Map[AttributeKey, NamedExpression],
+      aliasName: Option[String],
+      keepAlias: Boolean = false): Seq[Expression] = {
+
+    def getAttribute(exp: Expression) = {
+      exp match {
+        case Alias(agg: AggregateExpression, name) =>
+          agg.aggregateFunction.collect {
+            case attr: AttributeReference =>
+              AttributeReference(attr.name, attr.dataType, attr.nullable, attr
+                .metadata)(attr.exprId,
+                aliasName,
+                attr.isGenerated)
+          }.head
+        case Alias(child, name) =>
+          child
+        case other => other
+      }
+    }
+
+    expressions.map {
+        case alias@Alias(agg: AggregateExpression, name) =>
+          attrMap.get(AttributeKey(alias)).map { exp =>
+            Alias(getAttribute(exp), name)(alias.exprId,
+              alias.qualifier,
+              alias.explicitMetadata,
+              alias.isGenerated)
+          }.getOrElse(alias)
+
+        case attr: AttributeReference =>
+          val uattr = attrMap.get(AttributeKey(attr)).map{a =>
+            if (keepAlias) {
+              AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId,
+                attr.qualifier,
+                a.isGenerated)
+            } else {
+              a
+            }
+          }.getOrElse(attr)
+          uattr
+        case expression: Expression =>
+          val uattr = attrMap.getOrElse(AttributeKey(expression), expression)
+          uattr
+    }
+  }
+
+  def updateOutPutList(
+      subsumerOutputList: Seq[NamedExpression],
+      dataMapRltn: Select,
+      aliasMap: Map[AttributeKey, NamedExpression],
+      keepAlias: Boolean): Seq[NamedExpression] = {
+    var outputSel =
+      updateSubsumeAttrs(
+        subsumerOutputList,
+        aliasMap,
+        Some(dataMapRltn.aliasMap.values.head),
+        keepAlias).asInstanceOf[Seq[NamedExpression]]
+    outputSel.zip(subsumerOutputList).map{ case (l, r) =>
+      l match {
+        case attr: AttributeReference =>
+          Alias(attr, r.name)(r.exprId, None)
+        case a@Alias(attr: AttributeReference, name) =>
+          Alias(attr, r.name)(r.exprId, None)
+        case other => other
+      }
+    }
+
+  }
+
+  def updateSelectPredicates(
+      predicates: Seq[Expression],
+      attrMap: Map[AttributeKey, NamedExpression],
+      keepAlias: Boolean): Seq[Expression] = {
+    predicates.map { exp =>
+      exp transform {
+        case attr: AttributeReference =>
+          val uattr = attrMap.get(AttributeKey(attr)).map{a =>
+            if (keepAlias) {
+              AttributeReference(a.name, a.dataType, a.nullable, a.metadata)(a.exprId,
+                attr.qualifier,
+                a.isGenerated)
+            } else {
+              a
+            }
+          }.getOrElse(attr)
+          uattr
+      }
+    }
+  }
+
+  /**
+   * Update the modular plan as per the datamap table relation inside it.
+   *
+   * @param subsumer plan to be updated
+   * @return Updated modular plan.
+   */
+  def updateDataMap(subsumer: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+    subsumer match {
+      case s: Select if s.dataMapTableRelation.isDefined =>
+        val relation = s.dataMapTableRelation.get.asInstanceOf[Select]
+        val mappings = s.outputList zip relation.outputList
+        val oList = for ((o1, o2) <- mappings) yield {
+          if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+        }
+        relation.copy(outputList = oList).setRewritten()
+      case g: GroupBy if g.dataMapTableRelation.isDefined =>
+        val relation = g.dataMapTableRelation.get.asInstanceOf[Select]
+        val in = relation.asInstanceOf[Select].outputList
+        val mappings = g.outputList zip relation.outputList
+        val oList = for ((left, right) <- mappings) yield {
+          left match {
+            case Alias(agg@AggregateExpression(fun@Sum(child), _, _, _), name) =>
+              val uFun = fun.copy(child = right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+            case Alias(agg@AggregateExpression(fun@Max(child), _, _, _), name) =>
+              val uFun = fun.copy(child = right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+            case Alias(agg@AggregateExpression(fun@Min(child), _, _, _), name) =>
+              val uFun = fun.copy(child = right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+            case Alias(agg@AggregateExpression(fun@Count(Seq(child)), _, _, _), name) =>
+              val uFun = Sum(right)
+              Alias(agg.copy(aggregateFunction = uFun), left.name)(exprId = left.exprId)
+            case _ =>
+              if (left.name != right.name) Alias(right, left.name)(exprId = left.exprId) else right
+          }
+        }
+        val updatedPredicates = g.predicateList.map { f =>
+          mappings.find{ case (k, y) =>
+            k match {
+              case a: Alias if f.isInstanceOf[Alias] =>
+                a.child.semanticEquals(f.children.head)
+              case a: Alias => a.child.semanticEquals(f)
+              case other => other.semanticEquals(f)
+            }
+          } match {
+            case Some(r) => r._2
+            case _ => f
+          }
+        }
+        g.copy(outputList = oList,
+          inputList = in,
+          predicateList = updatedPredicates,
+          child = relation,
+          dataMapTableRelation = None).setRewritten()
+
+      case select: Select =>
+        select.children match {
+          case Seq(s: Select) if s.dataMapTableRelation.isDefined =>
+            val relation = s.dataMapTableRelation.get.asInstanceOf[Select]
+            val child = updateDataMap(s, rewrite).asInstanceOf[Select]
+            val aliasMap = getAttributeMap(relation.outputList, s.outputList)
+            var outputSel =
+              updateOutPutList(select.outputList, relation, aliasMap, keepAlias = true)
+            val pred = updateSelectPredicates(select.predicateList, aliasMap, true)
+            select.copy(outputList = outputSel,
+              inputList = child.outputList,
+              predicateList = pred,
+              children = Seq(child)).setRewritten()
+
+          case Seq(g: GroupBy) if g.dataMapTableRelation.isDefined =>
+            val relation = g.dataMapTableRelation.get.asInstanceOf[Select]
+            val aliasMap = getAttributeMap(relation.outputList, g.outputList)
+
+            val outputSel =
+              updateOutPutList(select.outputList, relation, aliasMap, keepAlias = false)
+            val child = updateDataMap(g, rewrite).asInstanceOf[Matchable]
+            // TODO Remove the unnecessary columns from selection.
+            // Only keep columns which are required by parent.
+            val inputSel = child.outputList
+            select.copy(
+              outputList = outputSel,
+              inputList = inputSel,
+              children = Seq(child)).setRewritten()
+
+          case _ => select
+        }
+
+      case other => other
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
new file mode 100644
index 0000000..412d547
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/datamap/MVState.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.datamap
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.SimpleModularizer
+import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
+import org.apache.carbondata.mv.rewrite.{DefaultMatchMaker, Navigator, QueryRewrite, SummaryDatasetCatalog}
+
+/**
+ * A class that holds all session-specific state.
+ */
+private[mv] class MVState(summaryDatasetCatalog: SummaryDatasetCatalog) {
+
+  // Note: These are all lazy vals because they depend on each other (e.g. conf) and we
+  // want subclasses to override some of the fields. Otherwise, we would get a lot of NPEs.
+
+  /**
+   * Modular query plan modularizer
+   */
+  lazy val modularizer = SimpleModularizer
+
+  /**
+   * Logical query plan optimizer.
+   */
+  lazy val optimizer = BirdcageOptimizer
+
+  lazy val matcher = DefaultMatchMaker
+
+  lazy val navigator: Navigator = new Navigator(summaryDatasetCatalog, this)
+
+  /**
+   * Rewrite the logical query plan to MV plan if applicable.
+   * @param plan
+   * @return
+   */
+  def rewritePlan(plan: LogicalPlan): QueryRewrite = new QueryRewrite(this, plan)
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
new file mode 100644
index 0000000..899c36c
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/DefaultMatchMaker.scala
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, AttributeSet, Expression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.{FullOuter, Inner}
+
+import org.apache.carbondata.mv.datamap.MVHelper
+import org.apache.carbondata.mv.plans.modular
+import org.apache.carbondata.mv.plans.modular._
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.SQLBuilder
+
+abstract class DefaultMatchMaker extends MatchMaker[ModularPlan]
+
+abstract class DefaultMatchPattern extends MatchPattern[ModularPlan] {
+
+  /** Name for this pattern, automatically inferred based on class name. */
+  val patternName: String = {
+    val className = getClass.getName
+    if (className endsWith "$") className.dropRight(1) else className
+  }
+
+  def factorOutSubsumer(
+      compensation: ModularPlan,
+      subsumer: Matchable,
+      aliasMapMain: Map[Int, String]): ModularPlan = {
+
+    // Create aliasMap with attribute to alias reference attribute
+    val aliasMap = AttributeMap(
+        subsumer.outputList.collect {
+          case a: Alias if a.child.isInstanceOf[Attribute] =>
+            (a.child.asInstanceOf[Attribute], a.toAttribute)
+          })
+
+    // Check and replace all alias references with subsumer alias map references.
+    val compensation1 = compensation.transform {
+      case plan if !plan.skip && plan != subsumer =>
+        plan.transformExpressions {
+          case a: AttributeReference =>
+            aliasMap
+              .get(a)
+              .map { ref =>
+                AttributeReference(
+                  ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = a.qualifier)
+              }.getOrElse(a)
+          }
+    }
+
+    val subqueryAttributeSet = SQLBuilder.collectAttributeSet(subsumer.outputList)
+    if (SQLBuilder.collectDuplicateNames(subqueryAttributeSet).nonEmpty) {
+      new UnsupportedOperationException(
+        s"duplicate name(s): ${ subsumer.output.map(_.toString + ", ") }")
+    }
+    if (aliasMapMain.size == 1) {
+      val subsumerName: Option[String] = aliasMapMain.get(0)
+      // Replace all compensation1 attributes with refrences of subsumer attributeset
+      val compensationFinal = compensation1.transformExpressions {
+        case ref: Attribute if subqueryAttributeSet.contains(ref) =>
+          AttributeReference(ref.name, ref.dataType)(exprId = ref.exprId, qualifier = subsumerName)
+        case alias: Alias if subqueryAttributeSet.contains(alias.toAttribute) =>
+          Alias(alias.child, alias.name)(exprId = alias.exprId, qualifier = subsumerName)
+      }
+      compensationFinal
+    } else {
+      compensation1
+    }
+  }
+}
+
+object DefaultMatchMaker extends DefaultMatchMaker {
+  lazy val patterns =
+    SelectSelectNoChildDelta ::
+    GroupbyGroupbyNoChildDelta ::
+    GroupbyGroupbySelectOnlyChildDelta ::
+    GroupbyGroupbyGroupbyChildDelta ::
+    SelectSelectSelectChildDelta ::
+    SelectSelectGroupbyChildDelta :: Nil
+}
+
+/**
+ * Convention:
+ * EmR: each subsumee's expression match some of subsumer's expression
+ * EdR: each subsumee's expression derive from some of subsumer's expression
+ * RmE: each subsumer's expression match some of subsumee's expression
+ * RdE: each subsumer's expression derive from some of subsumee's expression
+ */
+
+object SelectSelectNoChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]): Boolean = {
+    if (subsumee.asInstanceOf[Select].predicateList.contains(exprE)) {
+      subsumer.asInstanceOf[Select].predicateList.exists(_.semanticEquals(exprE)) ||
+      canEvaluate(exprE, subsumer)
+    } else if (subsumee.asInstanceOf[Select].outputList.contains(exprE)) {
+      exprE match {
+        case a@Alias(_, _) =>
+          exprListR.exists(a1 => a1.isInstanceOf[Alias] &&
+                                 a1.asInstanceOf[Alias].child.semanticEquals(a.child)) ||
+          exprListR.exists(_.semanticEquals(exprE) || canEvaluate(exprE, subsumer))
+        case exp => exprListR.exists(_.semanticEquals(exp) || canEvaluate(exp, subsumer))
+      }
+    } else {
+      false
+    }
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+
+    (subsumer, subsumee, compensation) match {
+      case (
+          sel_1a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
+          sel_1q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None
+        ) if sel_1a.children.forall { _.isInstanceOf[modular.LeafNode] } &&
+             sel_1q.children.forall { _.isInstanceOf[modular.LeafNode] } =>
+
+        // assume children (including harmonized relation) of subsumer and subsumee
+        // are 1-1 correspondence.
+        // Change the following two conditions to more complicated ones if we want to
+        // consider things that combine extrajoin, rejoin, and harmonized relations
+        val isUniqueRmE = subsumer.children.filter { x => subsumee.children.count(_ == x) != 1 }
+        val isUniqueEmR = subsumee.children.filter { x => subsumer.children.count(_ == x) != 1 }
+
+        val extrajoin = sel_1a.children.filterNot { child => sel_1q.children.contains(child) }
+        val rejoin = sel_1q.children.filterNot { child => sel_1a.children.contains(child) }
+        val rejoinOutputList = rejoin.flatMap(_.output)
+
+        val isPredicateRmE = sel_1a.predicateList.forall(expr =>
+          sel_1q.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_1q.predicateList.forall(expr =>
+          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+        val isOutputEdR = sel_1q.outputList.forall(expr =>
+          isDerivable(expr, sel_1a.outputList ++ rejoinOutputList, sel_1q, sel_1a, None))
+
+        if (isUniqueRmE.isEmpty && isUniqueEmR.isEmpty && extrajoin.isEmpty && isPredicateRmE &&
+            isPredicateEmdR && isOutputEdR) {
+          val mappings = sel_1a.children.zipWithIndex.map {
+            case (childr, fromIdx) if sel_1q.children.contains(childr) =>
+              val toIndx = sel_1q.children.indexWhere(_ == childr)
+              (toIndx -> fromIdx)
+
+          }
+          val e2r = mappings.toMap
+          val r2e = e2r.map(_.swap)
+          val r2eJoinsMatch = sel_1a.joinEdges.forall { x =>
+              (r2e.get(x.left), r2e.get(x.right)) match {
+                case (Some(l), Some(r)) =>
+                  val mappedEdge = JoinEdge(l, r, x.joinType)
+                  val joinTypeEquivalent =
+                    if (sel_1q.joinEdges.contains(mappedEdge)) true
+                    else {
+                      x.joinType match {
+                        case Inner | FullOuter =>
+                          sel_1q.joinEdges.contains(JoinEdge(r, l, x.joinType))
+                        case _ => false
+                      }
+                    }
+                  if (joinTypeEquivalent) {
+                    val sel_1a_join = sel_1a.extractJoinConditions(
+                      sel_1a.children(x.left),
+                      sel_1a.children(x.right))
+                    val sel_1q_join = sel_1q.extractJoinConditions(
+                      sel_1q.children(mappedEdge.left),
+                      sel_1q.children(mappedEdge.right))
+                    sel_1a_join.forall(e => sel_1q_join.exists(e.semanticEquals(_))) &&
+                    sel_1q_join.forall(e => sel_1a_join.exists(e.semanticEquals(_)))
+                  } else false
+                case _ => false
+              }
+          }
+
+          val isPredicateEmR = sel_1q.predicateList.forall(expr =>
+            sel_1a.predicateList.exists(_.semanticEquals(expr)))
+          val isOutputEmR = sel_1q.outputList.forall(expr =>
+            sel_1a.outputList.exists(_.semanticEquals(expr)))
+          val isOutputRmE = sel_1a.outputList.forall(expr =>
+            sel_1q.outputList.exists(_.semanticEquals(expr)))
+
+          if (r2eJoinsMatch) {
+            if (isPredicateEmR && isOutputEmR && isOutputRmE && rejoin.isEmpty) {
+              Seq(sel_1a) // no compensation needed
+            } else {
+              val tChildren = new collection.mutable.ArrayBuffer[ModularPlan]()
+              val tAliasMap = new collection.mutable.HashMap[Int, String]()
+
+              val updatedOutList: Seq[NamedExpression] = updateDuplicateColumns(sel_1a)
+              val usel_1a = sel_1a.copy(outputList = updatedOutList)
+              tChildren += usel_1a
+              tAliasMap += (tChildren.indexOf(usel_1a) -> rewrite.newSubsumerName())
+
+              sel_1q.children.zipWithIndex.foreach {
+                case (childe, idx) =>
+                  if (e2r.get(idx).isEmpty) {
+                    tChildren += childe
+                    sel_1q.aliasMap.get(idx).map(x => tAliasMap += (tChildren.indexOf(childe) -> x))
+                  }
+              }
+
+              val tJoinEdges = sel_1q.joinEdges.collect {
+                case JoinEdge(le, re, joinType) =>
+                  (e2r.get(le), e2r.get(re)) match {
+                    case (Some(lr), None) =>
+                      JoinEdge(
+                        0,
+                        tChildren.indexOf(sel_1q.children(re)),
+                        joinType)
+                    case (None, None) =>
+                      JoinEdge(
+                        tChildren.indexOf(sel_1q.children(le)),
+                        tChildren.indexOf(sel_1q.children(re)),
+                        joinType)
+                    case (None, Some(rr)) =>
+                      JoinEdge(
+                        tChildren.indexOf(sel_1q.children(le)),
+                        0,
+                        joinType)
+                    case _ =>
+                      null.asInstanceOf[JoinEdge]
+                  }
+              }
+              val tPredicateList = sel_1q.predicateList.filter { p =>
+                !sel_1a.predicateList.exists(_.semanticEquals(p)) }
+                val wip = sel_1q.copy(
+                  predicateList = tPredicateList,
+                  children = tChildren,
+                  joinEdges = tJoinEdges.filter(_ != null),
+                  aliasMap = tAliasMap.toMap)
+
+                val done = factorOutSubsumer(wip, usel_1a, wip.aliasMap)
+                Seq(done)
+            }
+          } else Nil
+        } else Nil
+
+      case (
+        sel_3a @ modular.Select(_, _, _, _, _, _, _, _, _, _),
+        sel_3q @ modular.Select(_, _, _, _, _, _, _, _, _, _), None)
+        if sel_3a.children.forall(_.isInstanceOf[GroupBy]) &&
+           sel_3q.children.forall(_.isInstanceOf[GroupBy]) =>
+        val isPredicateRmE = sel_3a.predicateList.isEmpty ||
+                             sel_3a.predicateList.forall(expr =>
+                               sel_3q.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_3q.predicateList.isEmpty ||
+                              sel_3q.predicateList.forall(expr =>
+                                sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+                                isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+        val isOutputEdR = sel_3q.outputList.forall(expr =>
+          isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+        val isSingleChild = sel_3a.children.length == 1 && sel_3q.children.length == 1
+
+        if (isPredicateRmE && isPredicateEmdR && isOutputEdR && isSingleChild) {
+          val isPredicateEmR = sel_3q.predicateList.isEmpty ||
+                               sel_3q.predicateList.forall(expr =>
+                                 sel_3a.predicateList.exists(_.semanticEquals(expr)))
+          val isOutputRmE = sel_3a.outputList.forall(expr =>
+            isDerivable(expr, sel_3q.outputList, sel_3a, sel_3q, None))
+          val isOutputEmR = sel_3q.outputList.forall(expr =>
+            isDerivable(expr, sel_3a.outputList, sel_3q, sel_3a, None))
+
+          if (isPredicateEmR && isOutputEmR && isOutputRmE) {
+            Seq(sel_3a)
+          } else if (isPredicateEmR && isOutputEmR) {
+            // no compensation needed
+            val sel_3q_exp = sel_3q.transformExpressions({
+              case a: Alias => sel_3a.outputList
+                .find { a1 =>
+                  a1.isInstanceOf[Alias] &&
+                  a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+                }.map(_.toAttribute).get
+            })
+            val wip = sel_3q_exp.copy(
+              children = Seq(sel_3a),
+              aliasMap = Seq(0 -> rewrite.newSubsumerName()).toMap)
+            val done = factorOutSubsumer(wip, sel_3a, wip.aliasMap)
+            Seq(done)
+          } else {
+            Nil
+          }
+        } else Nil
+
+      case _ => Nil
+    }
+  }
+
+  private def updateDuplicateColumns(sel_1a: Select) = {
+    val duplicateNameCols = sel_1a.outputList.groupBy(_.name).filter(_._2.length > 1).flatMap(_._2)
+      .toList
+    val updatedOutList = sel_1a.outputList.map { col =>
+      if (duplicateNameCols.contains(col)) {
+        Alias(col, col.qualifiedName)(exprId = col.exprId)
+      } else {
+        col
+      }
+    }
+    updatedOutList
+  }
+}
+
+object GroupbyGroupbyNoChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    (subsumer, subsumee, compensation) match {
+      case (
+        gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
+        gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
+        None) =>
+        val isGroupingEmR = gb_2q.predicateList.forall(expr =>
+          gb_2a.predicateList.exists(_.semanticEquals(expr)))
+        val isGroupingRmE = gb_2a.predicateList.forall(expr =>
+          gb_2q.predicateList.exists(_.semanticEquals(expr)))
+        if (isGroupingEmR && isGroupingRmE) {
+          val isOutputEmR = gb_2q.outputList.forall {
+            case a @ Alias(_, _) =>
+              gb_2a.outputList.exists{a1 =>
+                a1.isInstanceOf[Alias] && a1.asInstanceOf[Alias].child.semanticEquals(a.child)
+              }
+            case exp => gb_2a.outputList.exists(_.semanticEquals(exp))
+          }
+
+          if (isOutputEmR) {
+            // Mappings of output of two plans by checking semantic equals.
+            val mappings = gb_2a.outputList.zipWithIndex.map { case(exp, index) =>
+              (exp, gb_2q.outputList.find {
+                case a: Alias if exp.isInstanceOf[Alias] =>
+                  a.child.semanticEquals(exp.children.head)
+                case a: Alias => a.child.semanticEquals(exp)
+                case other => other.semanticEquals(exp)
+              }.getOrElse(gb_2a.outputList(index)))
+            }
+
+            val oList = mappings.map{case (out1, out2) =>
+              if (out1.name != out2.name) out1 match {
+                case alias: Alias => Alias(alias.child, out2.name)(exprId = alias.exprId)
+                case _ => Alias(out1, out2.name)(exprId = out2.exprId)
+              } else out1
+            }
+
+            Seq(gb_2a.copy(outputList = oList))
+          } else {
+            Nil
+          }
+        } else {
+          val aliasMap = AttributeMap(gb_2a.outputList.collect { case a: Alias =>
+            (a.toAttribute, a)})
+          if (isGroupingEmR) {
+            Utils.tryMatch(
+              gb_2a, gb_2q, aliasMap).flatMap {
+              case g: GroupBy =>
+                Some(g.copy(child = g.child.withNewChildren(
+                  g.child.children.map {
+                    case modular.Select(_, _, _, _, _, _, _, _, _, _) => gb_2a;
+                    case other => other
+                  })));
+              case _ => None}.map(Seq(_)).getOrElse(Nil)
+          } else {
+            Nil
+          }
+        }
+
+      case _ => Nil
+    }
+  }
+}
+
+object GroupbyGroupbySelectOnlyChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]) = {
+    if (subsumee.asInstanceOf[GroupBy].predicateList.contains(exprE)) {
+      if (exprListR.exists(_.semanticEquals(exprE)) || canEvaluate(exprE, exprListR)) true
+      else false
+    } else if (compensation.getOrElse(throw new RuntimeException("compensation cannot be None"))
+      .asInstanceOf[Select].predicateList.contains(exprE)) {
+      if (canEvaluate(exprE, exprListR) || exprListR.exists(_.semanticEquals(exprE))) true
+      else false
+    } else {
+      false
+    }
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val aggInputEinR = subsumee.expressions
+      .collect { case agg: aggregate.AggregateExpression => AttributeSet(Seq(agg))
+        .subsetOf(subsumer.outputSet)
+      }.forall(identity)
+    val compensationSelectOnly = !compensation.map { _.collect { case n => n.getClass } }
+      .exists(_.contains(modular.GroupBy))
+
+    (subsumer, subsumee, compensation, aggInputEinR, compensationSelectOnly) match {
+      case (
+        gb_2a @ modular.GroupBy(_, _, _, _, _, _, _, _),
+        gb_2q @ modular.GroupBy(_, _, _, _, _, _, _, _),
+        Some(sel_1c1 @ modular.Select(_, _, _, _, _, _, _, _, _, _)),
+        true,
+        true)
+        if !gb_2q.flags.hasFlag(EXPAND) && !gb_2a.flags.hasFlag(EXPAND) =>
+
+        val rejoinOutputList = sel_1c1.children.tail.flatMap(_.output)
+        val isGroupingEdR = gb_2q.predicateList.forall(expr =>
+          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+        val needRegrouping = !gb_2a.predicateList.forall(gb_2q.predicateList.contains)
+        val canPullup = sel_1c1.predicateList.forall(expr =>
+          isDerivable(expr, gb_2a.predicateList ++ rejoinOutputList, gb_2q, gb_2a, compensation))
+        val isAggEmR = gb_2q.outputList.collect {
+          case agg: aggregate.AggregateExpression =>
+            gb_2a.outputList.exists(_.semanticEquals(agg))
+        }.forall(identity)
+
+        if (isGroupingEdR && ((!needRegrouping && isAggEmR) || needRegrouping) && canPullup) {
+          // pull up
+          val pullupOutputList = gb_2a.outputList.map(_.toAttribute) ++ rejoinOutputList
+          val sel_2c1 = sel_1c1.copy(
+            outputList = pullupOutputList,
+            inputList = pullupOutputList,
+            children = sel_1c1.children.map {
+              case s: Select => gb_2a
+              case other => other })
+
+          if (rejoinOutputList.isEmpty) {
+            val aliasMap = AttributeMap(gb_2a.outputList.collect {
+              case a: Alias => (a.toAttribute, a) })
+            Utils.tryMatch(gb_2a, gb_2q, aliasMap).flatMap {
+              case g: GroupBy => Some(g.copy(child = sel_2c1));
+              case _ => None
+            }.map { wip =>
+              factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap)
+            }.map(Seq(_))
+             .getOrElse(Nil)
+          }
+          // TODO: implement regrouping with 1:N rejoin (rejoin tables being the "1" side)
+          // via catalog service
+          else if (!needRegrouping && isAggEmR) {
+            Seq(sel_2c1).map(wip => factorOutSubsumer(wip, gb_2a, sel_1c1.aliasMap))
+          } else Nil
+        } else Nil
+
+      case _ => Nil
+    }
+  }
+}
+
+object GroupbyGroupbyGroupbyChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val groupbys = compensation.map { _.collect { case g: GroupBy => g } }.getOrElse(Nil).toSet
+
+    (subsumer, subsumee, groupbys.nonEmpty) match {
+      case (
+        modular.Select(_, _, _, _, _, _, _, _, _, _),
+        modular.Select(_, _, _, _, _, _, _, _, _, _),
+        true) =>
+        // TODO: implement me
+        Nil
+
+      case _ => Nil
+    }
+  }
+}
+
+
+object SelectSelectSelectChildDelta extends DefaultMatchPattern {
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    val compensationSelectOnly =
+      !compensation
+        .map { _.collect { case n => n.getClass } }
+        .exists(_.contains(modular.GroupBy))
+
+    (subsumer, subsumee, compensationSelectOnly) match {
+      case (
+        modular.Select(_, _, _, _, _, _, _, _, _, _),
+        modular.Select(_, _, _, _, _, _, _, _, _, _),
+        true) =>
+        // TODO: implement me
+        Nil
+      case _ => Nil
+    }
+  }
+}
+
+object SelectSelectGroupbyChildDelta extends DefaultMatchPattern with PredicateHelper {
+  private def isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan]) = {
+    Utils.isDerivable(
+      exprE: Expression,
+      exprListR: Seq[Expression],
+      subsumee: ModularPlan,
+      subsumer: ModularPlan,
+      compensation: Option[ModularPlan])
+  }
+
+  def apply(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      compensation: Option[ModularPlan],
+      rewrite: QueryRewrite): Seq[ModularPlan] = {
+    (subsumer, subsumee, compensation, subsumer.children, subsumee.children) match {
+      case (
+        sel_3a@modular.Select(
+        _, _, Nil, _, _,
+        Seq(gb_2a@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
+        sel_3q@modular.Select(
+        _, _, _, _, _,
+        Seq(gb_2q@modular.GroupBy(_, _, _, _, _, _, _, _)), _, _, _, _),
+        Some(gb_2c@modular.GroupBy(_, _, _, _, _, _, _, _)),
+        rchild :: Nil,
+        echild :: Nil) =>
+        val tbls_sel_3a = sel_3a.collect { case tbl: modular.LeafNode => tbl }
+        val tbls_sel_3q = sel_3q.collect { case tbl: modular.LeafNode => tbl }
+
+        val extrajoin = tbls_sel_3a.filterNot(tbls_sel_3q.contains)
+        val rejoin = tbls_sel_3q.filterNot(tbls_sel_3a.contains)
+        val rejoinOutputList = rejoin.flatMap(_.output)
+
+        val isPredicateRmE = sel_3a.predicateList.forall(expr =>
+          sel_3q.predicateList.exists(_.semanticEquals(expr)) ||
+          gb_2c.predicateList.exists(_.semanticEquals(expr)))
+        val isPredicateEmdR = sel_3q.predicateList
+          .forall(expr =>
+            sel_3a.predicateList.exists(_.semanticEquals(expr)) ||
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+        val isOutputEdR = sel_3q.outputList
+          .forall(expr =>
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+
+        val canSELPullup = gb_2c.child.isInstanceOf[Select] &&
+                           gb_2c.child.asInstanceOf[Select].predicateList
+                             .forall(expr =>
+                               isDerivable(
+                                 expr,
+                                 sel_3a.outputList ++ rejoinOutputList,
+                                 sel_3q,
+                                 sel_3a,
+                                 compensation))
+        val canGBPullup = gb_2c.predicateList
+          .forall(expr =>
+            isDerivable(
+              expr,
+              sel_3a.outputList ++ rejoinOutputList,
+              sel_3q,
+              sel_3a,
+              compensation))
+
+        if (extrajoin.isEmpty && isPredicateRmE &&
+            isPredicateEmdR &&
+            isOutputEdR &&
+            canSELPullup &&
+            canGBPullup) {
+          gb_2c.child match {
+            case s: Select =>
+              val sel_3c1 = s.withNewChildren(
+                s.children.map {
+                  case gb: GroupBy => sel_3a.setSkip()
+                  case other => other })
+              val gb_3c2 = gb_2c.copy(child = sel_3c1)
+
+              val aliasMap_exp = AttributeMap(
+                gb_2c.outputList.collect {
+                  case a: Alias => (a.toAttribute, a) })
+              val sel_3q_exp = sel_3q.transformExpressions({
+                case attr: Attribute if aliasMap_exp.contains(attr) => aliasMap_exp(attr)
+              })
+              // Mappings of output of two plans by checking semantic equals.
+              val mappings = sel_3q_exp.outputList.zipWithIndex.map { case(exp, index) =>
+                (exp, gb_2c.outputList.find {
+                  case a: Alias if exp.isInstanceOf[Alias] =>
+                    a.child.semanticEquals(exp.children.head)
+                  case a: Alias => a.child.semanticEquals(exp)
+                  case other => other.semanticEquals(exp)
+                }.getOrElse(gb_2c.outputList(index)))
+              }
+
+              val oList = for ((o1, o2) <- mappings) yield {
+                if (o1.name != o2.name) Alias(o2, o1.name)(exprId = o1.exprId) else o2
+              }
+
+              val wip = sel_3q_exp.copy(outputList = oList, children = Seq(gb_3c2))
+              val sel_3c3 = Some(factorOutSubsumer(wip, sel_3a, s.aliasMap))
+              sel_3c3.map(Seq(_)).getOrElse(Nil)
+
+            case _ => Nil
+          }
+        } else {
+          Nil
+        }
+
+      case _ => Nil
+    }
+  }
+}
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
new file mode 100644
index 0000000..2a4da27
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchConditions.scala
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+// TODO: implement this to modularize DefaultMatchingFunctions
+object MatchConditions {
+}
+
+class MatchConditions(flags: Long) {
+  def hasFlag(flag: Long): Boolean = {
+    throw new UnsupportedOperationException
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
new file mode 100644
index 0000000..2c5d8f4
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/MatchMaker.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+abstract class MatchPattern[MatchingPlan <: TreeNode[MatchingPlan]] extends Logging {
+
+  def apply(
+      subsumer: MatchingPlan,
+      subsumee: MatchingPlan,
+      compensation: Option[MatchingPlan],
+      rewrite: QueryRewrite): Seq[MatchingPlan]
+
+}
+
+abstract class MatchMaker[MatchingPlan <: TreeNode[MatchingPlan]] {
+
+  /** Define a sequence of rules, to be overridden by the implementation. */
+  protected val patterns: Seq[MatchPattern[MatchingPlan]]
+
+  def execute(
+      subsumer: MatchingPlan,
+      subsumee: MatchingPlan,
+      compensation: Option[MatchingPlan],
+      rewrite: QueryRewrite): Iterator[MatchingPlan] = {
+    val iter = patterns.view.flatMap(_ (subsumer, subsumee, compensation, rewrite)).toIterator
+    iter
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
new file mode 100644
index 0000000..545920e
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/Navigator.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeSet}
+
+import org.apache.carbondata.mv.datamap.{MVHelper, MVState}
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, Select}
+import org.apache.carbondata.mv.plans.modular
+
+private[mv] class Navigator(catalog: SummaryDatasetCatalog, session: MVState) {
+
+  def rewriteWithSummaryDatasets(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+    val replaced = plan.transformAllExpressions {
+      case s: ModularSubquery =>
+        if (s.children.isEmpty) {
+          ScalarModularSubquery(
+            rewriteWithSummaryDatasetsCore(s.plan, rewrite), s.children, s.exprId)
+        }
+        else throw new UnsupportedOperationException(s"Rewrite expression $s isn't supported")
+      case o => o
+    }
+    rewriteWithSummaryDatasetsCore(replaced, rewrite)
+  }
+
+  def rewriteWithSummaryDatasetsCore(plan: ModularPlan, rewrite: QueryRewrite): ModularPlan = {
+    val rewrittenPlan = plan transformDown {
+      case currentFragment =>
+        if (currentFragment.rewritten || !currentFragment.isSPJGH) currentFragment
+        else {
+          val compensation =
+            (for { dataset <- catalog.lookupFeasibleSummaryDatasets(currentFragment).toStream
+                   subsumer <- session.modularizer.modularize(
+                     session.optimizer.execute(dataset.plan)).map(_.harmonized)
+                   subsumee <- unifySubsumee(currentFragment)
+                   comp <- subsume(
+                     unifySubsumer2(
+                       unifySubsumer1(
+                         subsumer,
+                         subsumee,
+                         dataset.relation),
+                       subsumee),
+                     subsumee, rewrite)
+                 } yield comp).headOption
+          compensation.map(_.setRewritten).getOrElse(currentFragment)
+        }
+    }
+    // In case it is rewritten plan and the datamap table is not updated then update the datamap
+    // table in plan.
+    if (rewrittenPlan.find(_.rewritten).isDefined) {
+      val updatedDataMapTablePlan = rewrittenPlan transform {
+        case s: Select =>
+          MVHelper.updateDataMap(s, rewrite)
+        case g: GroupBy =>
+          MVHelper.updateDataMap(g, rewrite)
+      }
+      // TODO Find a better way to set the rewritten flag, it may fail in some conditions.
+      val mapping =
+        rewrittenPlan.collect {case m: ModularPlan => m } zip
+        updatedDataMapTablePlan.collect {case m: ModularPlan => m}
+      mapping.foreach(f => if (f._1.rewritten) f._2.setRewritten())
+
+      updatedDataMapTablePlan
+
+    } else {
+      rewrittenPlan
+    }
+  }
+
+  def subsume(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      rewrite: QueryRewrite): Option[ModularPlan] = {
+    if (subsumer.getClass == subsumee.getClass) {
+      (subsumer.children, subsumee.children) match {
+        case (Nil, Nil) => None
+        case (r, e) if r.forall(_.isInstanceOf[modular.LeafNode]) &&
+                       e.forall(_.isInstanceOf[modular.LeafNode]) =>
+          val iter = session.matcher.execute(subsumer, subsumee, None, rewrite)
+          if (iter.hasNext) Some(iter.next)
+          else None
+
+        case (rchild :: Nil, echild :: Nil) =>
+          val compensation = subsume(rchild, echild, rewrite)
+          val oiter = compensation.map {
+            case comp if comp.eq(rchild) =>
+              session.matcher.execute(subsumer, subsumee, None, rewrite)
+            case _ =>
+              session.matcher.execute(subsumer, subsumee, compensation, rewrite)
+          }
+          oiter.flatMap { case iter if iter.hasNext => Some(iter.next)
+                          case _ => None }
+
+        case _ => None
+      }
+    } else None
+  }
+
+  private def updateDatamap(rchild: ModularPlan, subsume: ModularPlan) = {
+    val update = rchild match {
+      case s: Select if s.dataMapTableRelation.isDefined =>
+        true
+      case g: GroupBy if g.dataMapTableRelation.isDefined =>
+        true
+      case _ => false
+    }
+
+    if (update) {
+      subsume match {
+        case s: Select =>
+          s.copy(children = Seq(rchild))
+
+        case g: GroupBy =>
+          g.copy(child = rchild)
+        case _ => subsume
+      }
+    } else {
+      subsume
+    }
+  }
+
+  // add Select operator as placeholder on top of subsumee to facilitate matching
+  def unifySubsumee(subsumee: ModularPlan): Option[ModularPlan] = {
+    subsumee match {
+      case gb @ modular.GroupBy(_, _, _, _,
+        modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _) =>
+        Some(
+          Select(gb.outputList, gb.outputList, Nil, Map.empty, Nil, gb :: Nil, gb.flags,
+            gb.flagSpec, Seq.empty))
+      case other => Some(other)
+    }
+  }
+
+  // add Select operator as placeholder on top of subsumer to facilitate matching
+  def unifySubsumer1(
+      subsumer: ModularPlan,
+      subsumee: ModularPlan,
+      dataMapRelation: ModularPlan): ModularPlan = {
+    // Update datamap table relation to the subsumer modular plan
+    val updatedSubsumer = subsumer match {
+      case s: Select => s.copy(dataMapTableRelation = Some(dataMapRelation))
+      case g: GroupBy => g.copy(dataMapTableRelation = Some(dataMapRelation))
+      case other => other
+    }
+    (updatedSubsumer, subsumee) match {
+      case (r @
+        modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _),
+        modular.Select(_, _, _, _, _,
+          Seq(modular.GroupBy(_, _, _, _, modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)),
+          _, _, _, _)
+        ) =>
+        modular.Select(
+          r.outputList, r.outputList, Nil, Map.empty, Nil, r :: Nil, r.flags,
+          r.flagSpec, Seq.empty).setSkip()
+      case _ => updatedSubsumer.setSkip()
+    }
+  }
+
+  def unifySubsumer2(subsumer: ModularPlan, subsumee: ModularPlan): ModularPlan = {
+    val rtables = subsumer.collect { case n: modular.LeafNode => n }
+    val etables = subsumee.collect { case n: modular.LeafNode => n }
+    val pairs = for {
+      rtable <- rtables
+      etable <- etables
+      if (rtable == etable)
+    } yield (rtable, etable)
+
+    pairs.foldLeft(subsumer) {
+      case (curSubsumer, pair) =>
+        val nxtSubsumer = curSubsumer.transform { case pair._1 => pair._2 }
+        val attributeSet = AttributeSet(pair._1.output)
+        val rewrites = AttributeMap(pair._1.output.zip(pair._2.output))
+        nxtSubsumer.transformUp {
+          case p => p.transformExpressions {
+            case a: Attribute if attributeSet contains a => rewrites(a).withQualifier(a.qualifier)
+          }
+        }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
new file mode 100644
index 0000000..5039d66
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/QueryRewrite.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.datamap.MVState
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * The primary workflow for rewriting relational queries using Spark libraries.
+ */
+class QueryRewrite private (
+    state: MVState,
+    logical: LogicalPlan,
+    nextSubqueryId: AtomicLong) {
+  self =>
+
+  def this(state: MVState, logical: LogicalPlan) =
+    this(state, logical, new AtomicLong(0))
+
+  def newSubsumerName(): String = s"gen_subsumer_${nextSubqueryId.getAndIncrement()}"
+
+  lazy val optimizedPlan: LogicalPlan =
+    state.optimizer.execute(logical)
+
+  lazy val modularPlan: ModularPlan =
+    state.modularizer.modularize(optimizedPlan).next().harmonized
+
+  lazy val withSummaryData: ModularPlan =
+    state.navigator.rewriteWithSummaryDatasets(modularPlan, self)
+
+  lazy val toCompactSQL: String = withSummaryData.asCompactSQL
+
+  lazy val toOneLineSQL: String = withSummaryData.asOneLineSQL
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf73e9fe/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
new file mode 100644
index 0000000..c29c08f
--- /dev/null
+++ b/datamap/mv/core/src/main/scala/org/apache/carbondata/mv/rewrite/SummaryDatasetCatalog.scala
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.rewrite
+
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.FindDataSourceTable
+import org.apache.spark.sql.parser.CarbonSpark2SqlParser
+
+import org.apache.carbondata.core.datamap.DataMapCatalog
+import org.apache.carbondata.core.datamap.status.DataMapStatusManager
+import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
+import org.apache.carbondata.mv.datamap.{MVHelper, MVState}
+import org.apache.carbondata.mv.plans.modular.{Flags, ModularPlan, ModularRelation, Select}
+import org.apache.carbondata.mv.plans.util.Signature
+
+/** Holds a summary logical plan */
+private[mv] case class SummaryDataset(signature: Option[Signature],
+    plan: LogicalPlan,
+    dataMapSchema: DataMapSchema,
+    relation: ModularPlan)
+
+private[mv] class SummaryDatasetCatalog(sparkSession: SparkSession)
+  extends DataMapCatalog[SummaryDataset] {
+
+  @transient
+  private val summaryDatasets = new scala.collection.mutable.ArrayBuffer[SummaryDataset]
+
+  val mVState = new MVState(this)
+
+  @transient
+  private val registerLock = new ReentrantReadWriteLock
+
+  /**
+   * parser
+   */
+  lazy val parser = new CarbonSpark2SqlParser
+
+  /** Acquires a read lock on the catalog for the duration of `f`. */
+  private def readLock[A](f: => A): A = {
+    val lock = registerLock.readLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Acquires a write lock on the catalog for the duration of `f`. */
+  private def writeLock[A](f: => A): A = {
+    val lock = registerLock.writeLock()
+    lock.lock()
+    try f finally {
+      lock.unlock()
+    }
+  }
+
+  /** Clears all summary tables. */
+  private[mv] def refresh(): Unit = {
+    writeLock {
+      summaryDatasets.clear()
+    }
+  }
+
+  /** Checks if the catalog is empty. */
+  private[mv] def isEmpty: Boolean = {
+    readLock {
+      summaryDatasets.isEmpty
+    }
+  }
+
+  /**
+   * Registers the data produced by the logical representation of the given [[DataFrame]]. Unlike
+   * `RDD.cache()`, the default storage level is set to be `MEMORY_AND_DISK` because recomputing
+   * the in-memory columnar representation of the underlying table is expensive.
+   */
+  private[mv] def registerSchema(dataMapSchema: DataMapSchema): Unit = {
+    writeLock {
+      // TODO Add mvfunction here, don't use preagg function
+      val updatedQuery = parser.addPreAggFunction(dataMapSchema.getCtasQuery)
+      val query = sparkSession.sql(updatedQuery)
+      val planToRegister = MVHelper.dropDummFuc(query.queryExecution.analyzed)
+      val modularPlan = mVState.modularizer.modularize(mVState.optimizer.execute(planToRegister))
+        .next()
+        .harmonized
+      val signature = modularPlan.signature
+      val identifier = dataMapSchema.getRelationIdentifier
+      val output = new FindDataSourceTable(sparkSession).apply(sparkSession.sessionState.catalog
+        .lookupRelation(TableIdentifier(identifier.getTableName, Some(identifier.getDatabaseName))))
+        .output
+      val relation = ModularRelation(identifier.getDatabaseName,
+        identifier.getTableName,
+        output,
+        Flags.NoFlags,
+        Seq.empty)
+      val select = Select(relation.outputList,
+        relation.outputList,
+        Seq.empty,
+        Seq((0, identifier.getTableName)).toMap,
+        Seq.empty,
+        Seq(relation),
+        Flags.NoFlags,
+        Seq.empty,
+        Seq.empty,
+        None)
+
+      summaryDatasets += SummaryDataset(signature, planToRegister, dataMapSchema, select)
+    }
+  }
+
+  /** Removes the given [[DataFrame]] from the catalog */
+  private[mv] def unregisterSchema(dataMapName: String): Unit = {
+    writeLock {
+      val dataIndex = summaryDatasets
+        .indexWhere(sd => sd.dataMapSchema.getDataMapName.equals(dataMapName))
+      require(dataIndex >= 0, s"Datamap $dataMapName is not registered.")
+      summaryDatasets.remove(dataIndex)
+    }
+  }
+
+
+  override def listAllSchema(): Array[SummaryDataset] = summaryDatasets.toArray
+
+  /** Returns feasible registered summary data sets for processing the given ModularPlan. */
+  private[mv] def lookupFeasibleSummaryDatasets(plan: ModularPlan): Seq[SummaryDataset] = {
+    readLock {
+      val sig = plan.signature
+      val statusDetails = DataMapStatusManager.getEnabledDataMapStatusDetails
+      // Only select the enabled datamaps for the query.
+      val enabledDataSets = summaryDatasets.filter{p =>
+        statusDetails.exists(_.getDataMapName.equalsIgnoreCase(p.dataMapSchema.getDataMapName))
+      }
+      val feasible = enabledDataSets.filter { x =>
+        (x.signature, sig) match {
+          case (Some(sig1), Some(sig2)) =>
+            if (sig1.groupby && sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+              true
+            } else if (!sig1.groupby && !sig2.groupby && sig1.datasets.subsetOf(sig2.datasets)) {
+              true
+            } else {
+              false
+            }
+
+          case _ => false
+        }
+      }
+      // heuristics: more tables involved in a summary data set => higher query reduction factor
+      feasible.sortWith(_.signature.get.datasets.size > _.signature.get.datasets.size)
+    }
+  }
+}