You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/05/17 14:30:42 UTC

[29/50] [abbrv] carbondata git commit: [CARBONDATA-2474] Support Modular Plan for Materialized View DataMap

[CARBONDATA-2474] Support Modular Plan for Materialized View DataMap

Currently carbon supports preaggregate datamap, which only supports preaggregate on single table. To improve it, we can add join capability by implementing Materialized View.

In carbon Materialized View, Modular Plan is the basic abstraction for materialized view query plan. In this PR, modular plan module is added, it defines plan tree structure and conversion from Spark Logical Plan.

This closes #2301


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ffddba70
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ffddba70
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ffddba70

Branch: refs/heads/spark-2.3
Commit: ffddba704bdc110b7fae9054b186c931cfb23744
Parents: 443b717
Author: Jacky Li <ja...@qq.com>
Authored: Sat May 12 17:23:20 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sat May 12 20:31:43 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/DataMapStoreManager.java       |   36 +-
 .../schema/datamap/DataMapClassProvider.java    |    3 +-
 .../metadata/schema/table/DataMapSchema.java    |    4 +-
 datamap/mv/plan/pom.xml                         |  157 +
 .../org/apache/carbondata/mv/dsl/package.scala  |  101 +
 .../mv/expressions/modular/subquery.scala       |  169 +
 .../mv/plans/modular/AggregatePushDown.scala    |  170 +
 .../carbondata/mv/plans/modular/Flags.scala     |   71 +
 .../mv/plans/modular/Harmonizer.scala           |  236 +
 .../mv/plans/modular/ModularPatterns.scala      |  237 +
 .../mv/plans/modular/ModularPlan.scala          |  206 +
 .../modular/ModularPlanSignatureGenerator.scala |   73 +
 .../mv/plans/modular/ModularRelation.scala      |  143 +
 .../mv/plans/modular/Modularizer.scala          |  117 +
 .../mv/plans/modular/basicOperators.scala       |   86 +
 .../mv/plans/modular/queryGraph.scala           |   24 +
 .../apache/carbondata/mv/plans/package.scala    |   55 +
 .../mv/plans/util/BirdcageOptimizer.scala       |  199 +
 .../plans/util/Logical2ModularExtractions.scala |  355 ++
 .../util/LogicalPlanSignatureGenerator.scala    |  101 +
 .../carbondata/mv/plans/util/Printers.scala     |  347 ++
 .../carbondata/mv/plans/util/SQLBuild.scala     |   31 +
 .../carbondata/mv/plans/util/SQLBuildDSL.scala  |  428 ++
 .../carbondata/mv/plans/util/SQLBuilder.scala   |  262 ++
 .../carbondata/mv/plans/util/Signature.scala    |   49 +
 .../carbondata/mv/plans/util/TableCluster.scala |   55 +
 .../mv/testutil/ModularPlanTest.scala           |  180 +
 .../mv/testutil/Tpcds_1_4_QueryBatch.scala      | 4293 ++++++++++++++++++
 .../mv/testutil/Tpcds_1_4_Tables.scala          |  819 ++++
 .../org/apache/carbondata/mv/TestSQLBatch.scala |  584 +++
 .../mv/plans/ExtractJoinConditionsSuite.scala   |   67 +
 .../carbondata/mv/plans/IsSPJGHSuite.scala      |   59 +
 .../mv/plans/LogicalToModularPlanSuite.scala    |  196 +
 .../carbondata/mv/plans/ModularToSQLSuite.scala |  164 +
 .../carbondata/mv/plans/SignatureSuite.scala    |  104 +
 .../carbondata/spark/util/CarbonScalaUtil.scala |   12 +-
 .../apache/spark/sql/util/SparkSQLUtil.scala    |    7 +-
 .../carbondata/datamap/DataMapManager.java      |    9 +
 .../datamap/CarbonDropDataMapCommand.scala      |    9 +-
 .../command/table/CarbonDropTableCommand.scala  |    1 +
 pom.xml                                         |    6 +
 41 files changed, 10214 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index 9f7af2b..a3be26a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -64,7 +64,7 @@ public final class DataMapStoreManager {
   /**
    * Contains the datamap catalog for each datamap provider.
    */
-  private Map<String, DataMapCatalog> dataMapCatalogs = new ConcurrentHashMap<>();
+  private Map<String, DataMapCatalog> dataMapCatalogs = null;
 
   private Map<String, TableSegmentRefresher> segmentRefreshMap = new ConcurrentHashMap<>();
 
@@ -166,7 +166,8 @@ public final class DataMapStoreManager {
    * @param dataMapSchema
    */
   public synchronized void registerDataMapCatalog(DataMapProvider dataMapProvider,
-      DataMapSchema dataMapSchema) {
+      DataMapSchema dataMapSchema) throws IOException {
+    intializeDataMapCatalogs(dataMapProvider);
     String name = dataMapSchema.getProviderName();
     DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
     if (dataMapCatalog == null) {
@@ -185,6 +186,9 @@ public final class DataMapStoreManager {
    * @param dataMapSchema
    */
   public synchronized void unRegisterDataMapCatalog(DataMapSchema dataMapSchema) {
+    if (dataMapCatalogs == null) {
+      return;
+    }
     String name = dataMapSchema.getProviderName();
     DataMapCatalog dataMapCatalog = dataMapCatalogs.get(name);
     if (dataMapCatalog != null) {
@@ -197,11 +201,37 @@ public final class DataMapStoreManager {
    * @param providerName
    * @return
    */
-  public DataMapCatalog getDataMapCatalog(String providerName) {
+  public DataMapCatalog getDataMapCatalog(DataMapProvider dataMapProvider, String providerName)
+      throws IOException {
+    intializeDataMapCatalogs(dataMapProvider);
     return dataMapCatalogs.get(providerName);
   }
 
   /**
+   * Initialize by reading all datamaps from store and re register it
+   * @param dataMapProvider
+   */
+  private void intializeDataMapCatalogs(DataMapProvider dataMapProvider) throws IOException {
+    if (dataMapCatalogs == null) {
+      dataMapCatalogs = new ConcurrentHashMap<>();
+      List<DataMapSchema> dataMapSchemas = getAllDataMapSchemas();
+      for (DataMapSchema schema : dataMapSchemas) {
+        DataMapCatalog dataMapCatalog = dataMapCatalogs.get(schema.getProviderName());
+        if (dataMapCatalog == null) {
+          dataMapCatalog = dataMapProvider.createDataMapCatalog();
+          dataMapCatalogs.put(schema.getProviderName(), dataMapCatalog);
+        }
+        try {
+          dataMapCatalog.registerSchema(schema);
+        } catch (Exception e) {
+          // Ignore the schema
+          LOGGER.error(e, "Error while registering schema");
+        }
+      }
+    }
+  }
+
+  /**
    * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
    *
    * @param table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
index d5a99e8..772bb40 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/datamap/DataMapClassProvider.java
@@ -30,7 +30,8 @@ public enum DataMapClassProvider {
   PREAGGREGATE("org.apache.carbondata.core.datamap.AggregateDataMap", "preaggregate"),
   TIMESERIES("org.apache.carbondata.core.datamap.TimeSeriesDataMap", "timeseries"),
   LUCENE("org.apache.carbondata.datamap.lucene.LuceneFineGrainDataMapFactory","lucene"),
-  BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter");
+  BLOOMFILTER("org.apache.carbondata.datamap.bloom.BloomCoarseGrainDataMapFactory", "bloomfilter"),
+  MV("org.apache.carbondata.core.datamap.MVDataMap", "mv");
 
   /**
    * Fully qualified class name of datamap

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
index 611f298..7f6e86f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/DataMapSchema.java
@@ -152,7 +152,9 @@ public class DataMapSchema implements Serializable, Writable {
    */
   public boolean isIndexDataMap() {
     if (providerName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.getShortName()) ||
-        providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName())) {
+        providerName.equalsIgnoreCase(DataMapClassProvider.TIMESERIES.getShortName()) ||
+        providerName.equalsIgnoreCase(DataMapClassProvider.MV.getShortName()) ||
+        ctasQuery != null) {
       return false;
     } else {
       return true;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/pom.xml
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/pom.xml b/datamap/mv/plan/pom.xml
new file mode 100644
index 0000000..6a36fc5
--- /dev/null
+++ b/datamap/mv/plan/pom.xml
@@ -0,0 +1,157 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.4.0-SNAPSHOT</version>
+    <relativePath>../../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-mv-plan</artifactId>
+  <name>Apache CarbonData :: Materialized View Plan</name>
+
+  <properties>
+    <dev.path>${basedir}/../../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>provided</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <testSourceDirectory>src/test/scala</testSourceDirectory>
+    <plugins>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <version>2.18</version>
+        <!-- Note config is repeated in scalatest config -->
+        <configuration>
+          <skip>false</skip>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+          <testFailureIgnore>false</testFailureIgnore>
+          <failIfNoTests>false</failIfNoTests>
+        </configuration>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <version>2.17</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-enforcer-plugin</artifactId>
+        <version>1.4.1</version>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>com.ning.maven.plugins</groupId>
+        <artifactId>maven-duplicate-finder-plugin</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+      <plugin>
+        <groupId>org.scalatest</groupId>
+        <artifactId>scalatest-maven-plugin</artifactId>
+        <version>1.0</version>
+        <!-- Note config is repeated in surefire config -->
+        <configuration>
+          <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+          <junitxml>.</junitxml>
+          <testFailureIgnore>false</testFailureIgnore>
+          <filereports>CarbonTestSuite.txt</filereports>
+          <argLine>-ea -Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m
+          </argLine>
+          <stderr />
+          <environmentVariables>
+          </environmentVariables>
+          <systemProperties>
+            <java.awt.headless>true</java.awt.headless>
+          </systemProperties>
+        </configuration>
+        <executions>
+          <execution>
+            <id>test</id>
+            <goals>
+              <goal>test</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+    </plugins>
+  </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
new file mode 100644
index 0000000..20b5e8a
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/dsl/package.scala
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import scala.language.implicitConversions
+
+import org.apache.spark.sql.catalyst._
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.{JoinEdge, ModularPlan}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util._
+
+/**
+ * A collection of implicit conversions that create a DSL for constructing data structures
+ * for modular plans.
+ *
+ */
+package object dsl {
+
+  // object plans {
+
+    implicit class DslModularPlan(val modularPlan: ModularPlan) {
+      def select(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateExprs: Expression*)
+        (aliasMap: Map[Int, String])
+        (joinEdges: JoinEdge*): ModularPlan = {
+        modular
+          .Select(
+            outputExprs,
+            inputExprs,
+            predicateExprs,
+            aliasMap,
+            joinEdges,
+            Seq(modularPlan),
+            NoFlags,
+            Seq.empty,
+            Seq.empty)
+      }
+
+      def groupBy(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateExprs: Expression*): ModularPlan = {
+        modular
+          .GroupBy(outputExprs, inputExprs, predicateExprs, None, modularPlan, NoFlags, Seq.empty)
+      }
+
+      def harmonize: ModularPlan = modularPlan.harmonized
+    }
+
+    implicit class DslModularPlans(val modularPlans: Seq[ModularPlan]) {
+      def select(outputExprs: NamedExpression*)
+        (inputExprs: Expression*)
+        (predicateList: Expression*)
+        (aliasMap: Map[Int, String])
+        (joinEdges: JoinEdge*): ModularPlan = {
+        modular
+          .Select(
+            outputExprs,
+            inputExprs,
+            predicateList,
+            aliasMap,
+            joinEdges,
+            modularPlans,
+            NoFlags,
+            Seq.empty,
+            Seq.empty)
+      }
+
+      def union(): ModularPlan = modular.Union(modularPlans, NoFlags, Seq.empty)
+    }
+
+    implicit class DslLogical2Modular(val logicalPlan: LogicalPlan) {
+      def resolveonly: LogicalPlan = analysis.SimpleAnalyzer.execute(logicalPlan)
+
+      def modularize: ModularPlan = modular.SimpleModularizer.modularize(logicalPlan).next
+
+      def optimize: LogicalPlan = BirdcageOptimizer.execute(logicalPlan)
+    }
+
+  // }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
new file mode 100644
index 0000000..cfe341a
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/expressions/modular/subquery.scala
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.expressions.modular
+
+import org.apache.spark.sql.catalyst.expressions.{AttributeSeq, AttributeSet, Expression, ExprId, LeafExpression, NamedExpression, OuterReference, PlanExpression, Predicate, Unevaluable}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+
+/**
+ * A base interface for expressions that contain a [[ModularPlan]].
+ */
+abstract class ModularSubquery(
+    plan: ModularPlan,
+    children: Seq[Expression],
+    exprId: ExprId) extends PlanExpression[ModularPlan] {
+  override lazy val resolved: Boolean = childrenResolved && plan.resolved
+  override lazy val references: AttributeSet =
+    if (plan.resolved) {
+      super.references -- plan.outputSet
+    } else {
+      super.references
+    }
+
+  override def withNewPlan(plan: ModularPlan): ModularSubquery
+
+  override def semanticEquals(o: Expression): Boolean = {
+    o match {
+      case p: ModularSubquery =>
+        this.getClass.getName.equals(p.getClass.getName) && plan.sameResult(p.plan) &&
+        children.length == p.children.length &&
+        children.zip(p.children).forall(p => p._1.semanticEquals(p._2))
+      case _ => false
+    }
+  }
+
+  def canonicalize(attrs: AttributeSeq): ModularSubquery = {
+    // Normalize the outer references in the subquery plan.
+    val normalizedPlan = plan.transformAllExpressions {
+      case OuterReference(r) => OuterReference(QueryPlan.normalizeExprId(r, attrs))
+    }
+    withNewPlan(normalizedPlan).canonicalized.asInstanceOf[ModularSubquery]
+  }
+}
+
+/**
+ * A subquery that will return only one row and one column. This will be converted into a physical
+ * scalar subquery during planning.
+ *
+ * Note: `exprId` is used to have a unique name in explain string output.
+ */
+case class ScalarModularSubquery(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Unevaluable {
+  override def dataType: DataType = plan.schema.fields.head.dataType
+
+  override def nullable: Boolean = true
+
+  override def withNewPlan(plan: ModularPlan): ScalarModularSubquery = copy(plan = plan)
+
+  override def toString: String = s"scalar-modular-subquery#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ScalarModularSubquery(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+object ScalarModularSubquery {
+  def hasCorrelatedScalarSubquery(e: Expression): Boolean = {
+    e.find {
+      case s: ScalarModularSubquery => s.children.nonEmpty
+      case _ => false
+    }.isDefined
+  }
+}
+
+/**
+ * A [[ListQuery]] expression defines the query which we want to search in an IN subquery
+ * expression. It should and can only be used in conjunction with an IN expression.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT  *
+ *   FROM    a
+ *   WHERE   a.id IN (SELECT  id
+ *                    FROM    b)
+ * }}}
+ */
+case class ModularListQuery(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Unevaluable {
+  override def dataType: DataType = plan.schema.fields.head.dataType
+
+  override def nullable: Boolean = false
+
+  override def withNewPlan(plan: ModularPlan): ModularListQuery = copy(plan = plan)
+
+  override def toString: String = s"modular-list#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ModularListQuery(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+/**
+ * The [[Exists]] expression checks if a row exists in a subquery given some correlated condition.
+ *
+ * For example (SQL):
+ * {{{
+ *   SELECT  *
+ *   FROM    a
+ *   WHERE   EXISTS (SELECT  *
+ *                   FROM    b
+ *                   WHERE   b.id = a.id)
+ * }}}
+ */
+case class ModularExists(
+    plan: ModularPlan,
+    children: Seq[Expression] = Seq.empty,
+    exprId: ExprId = NamedExpression.newExprId)
+  extends ModularSubquery(plan, children, exprId) with Predicate with Unevaluable {
+  override def nullable: Boolean = false
+
+  override def withNewPlan(plan: ModularPlan): ModularExists = copy(plan = plan)
+
+  override def toString: String = s"modular-exists#${ exprId.id } $conditionString"
+
+  override lazy val canonicalized: Expression = {
+    ModularExists(
+      plan.canonicalized,
+      children.map(_.canonicalized),
+      ExprId(0))
+  }
+}
+
+/**
+ * A place holder for generated SQL for subquery expression.
+ */
+case class SubqueryHolder(override val sql: String) extends LeafExpression with Unevaluable {
+  override def dataType: DataType = NullType
+
+  override def nullable: Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
new file mode 100644
index 0000000..77efaf7
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import scala.collection._
+
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, ExprId, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+
+trait AggregatePushDown { // self: ModularPlan =>
+
+  def findPushThroughAggregates(outputList: Seq[NamedExpression],
+      selAliasMap: AttributeMap[Attribute],
+      fact: ModularRelation): Map[Int, (NamedExpression, Seq[NamedExpression])] = {
+    var pushable = true
+    val map = scala.collection.mutable.Map[Int, (NamedExpression, Seq[NamedExpression])]()
+    outputList.zipWithIndex.foreach {
+      // TODO: find out if the first two case as follows really needed.  Comment out for now.
+      case (attr: Attribute, i) if !fact.outputSet.contains(attr) => pushable = false
+      case (alias: Alias, i)
+        if alias.child.isInstanceOf[Attribute] &&
+           !fact.outputSet.contains(alias.child.asInstanceOf[Attribute]) => pushable = false
+      case (alias: Alias, i) if alias.child.isInstanceOf[AggregateExpression] =>
+        val res = transformAggregate(
+          alias.child
+            .asInstanceOf[AggregateExpression],
+          selAliasMap,
+          i,
+          fact,
+          map,
+          Some((alias.name, alias.exprId)))
+        if (res.isEmpty) {
+          pushable = false
+        }
+      case (agg: AggregateExpression, i) =>
+        val res = transformAggregate(
+          agg,
+          selAliasMap,
+          i,
+          fact,
+          map,
+          None)
+        if (res.isEmpty) {
+          pushable = false
+        }
+      case _ =>
+    }
+    if (!pushable) {
+      Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+    } else {
+      map
+    }
+  }
+
+  private def transformAggregate(aggregate: AggregateExpression,
+      selAliasMap: AttributeMap[Attribute],
+      ith: Int,
+      fact: ModularRelation,
+      map: scala.collection.mutable.Map[Int, (NamedExpression, Seq[NamedExpression])],
+      aliasInfo: Option[(String, ExprId)]) = {
+    aggregate match {
+      case cnt@AggregateExpression(Count(exprs), _, false, _) if (exprs.length == 1 && exprs(0)
+        .isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(exprs(0).asInstanceOf[Attribute]).getOrElse(exprs(0))
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val cnt1 = AggregateExpression(Count(tAttr), cnt.mode, false)
+          val alias = Alias(cnt1, cnt1.toString)()
+          val tSum = AggregateExpression(Sum(alias.toAttribute), cnt.mode, false, cnt.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case cnt@AggregateExpression(Count(exprs), _, false, _) if (exprs.length == 1 && exprs(0)
+        .isInstanceOf[Literal]) =>
+        val cnt1 = AggregateExpression(Count(exprs(0)), cnt.mode, false)
+        val alias = Alias(cnt1, cnt1.toString)()
+        val tSum = AggregateExpression(Sum(alias.toAttribute), cnt.mode, false, cnt.resultId)
+        val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+        map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+      case sum@AggregateExpression(Sum(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val sum1 = AggregateExpression(Sum(tAttr), sum.mode, false)
+          val alias = Alias(sum1, sum1.toString)()
+          val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case sum@AggregateExpression(Sum(Cast(expr, dataType, timeZoneId)), _, false, _)
+        if expr.isInstanceOf[Attribute] =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val sum1 = AggregateExpression(Sum(Cast(tAttr, dataType, timeZoneId)), sum.mode, false)
+          val alias = Alias(sum1, sum1.toString)()
+          val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case sum@AggregateExpression(Sum(expr), _, false, _) if (expr.isInstanceOf[Literal]) =>
+        val sum1 = AggregateExpression(Sum(expr), sum.mode, false)
+        val alias = Alias(sum1, sum1.toString)()
+        val tSum = AggregateExpression(Sum(alias.toAttribute), sum.mode, false, sum.resultId)
+        val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+        map += (ith -> (Alias(tSum, name)(exprId = id), Seq(alias)))
+      case max@AggregateExpression(Max(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val max1 = AggregateExpression(Sum(tAttr), max.mode, false)
+          val alias = Alias(max1, max1.toString)()
+          val tMax = AggregateExpression(Max(alias.toAttribute), max.mode, false, max.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tMax, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case min@AggregateExpression(Min(expr), _, false, _) if (expr.isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val min1 = AggregateExpression(Min(tAttr), min.mode, false)
+          val alias = Alias(min1, min1.toString)()
+          val tMin = AggregateExpression(Max(alias.toAttribute), min.mode, false, min.resultId)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tMin, name)(exprId = id), Seq(alias)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case avg@AggregateExpression(Average(expr), _, false, _) if (expr
+        .isInstanceOf[Attribute]) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          val savg = AggregateExpression(Sum(tAttr), avg.mode, false)
+          val cavg = AggregateExpression(Count(tAttr), avg.mode, false)
+          val sAvg = Alias(savg, savg.toString)()
+          val cAvg = Alias(cavg, cavg.toString)()
+          val tAvg = Divide(sAvg.toAttribute, cAvg.toAttribute)
+          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+          map += (ith -> (Alias(tAvg, name)(exprId = id), Seq(sAvg, cAvg)))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case _ => Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
new file mode 100644
index 0000000..2b9d8cf
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Flags.scala
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+trait Flags {
+  // each query allows only one of each of the following keywords
+  final val DISTINCT = 1L << 0
+  final val LIMIT = 1L << 1
+  final val SORT = 1L << 2
+  final val GLOBAL = 1L << 3
+  final val LOCAL = 1L << 4
+  final val EXPAND = 1L << 5
+
+  // to determine each Seq[Expression] in varagrgs belong to which keyword
+  //  final val SortLimit = SORT | LIMIT
+
+  def flagToString(flag: Long): String = {
+    flag match {
+      case DISTINCT => "DISTINCT"
+      case LIMIT => "LIMIT"
+    }
+  }
+
+  // List of the raw flags that have expressions as arguments
+  // TODO: add EXPAND
+  private def pickledWithExpressions = {
+    Array[Long](SORT, LIMIT, EXPAND)
+  }
+
+  final val MaxBitPosition = 6
+
+  final val pickledListOrder: List[Long] = {
+    val all = 0 to MaxBitPosition map (1L << _)
+    all.toList filter (pickledWithExpressions contains _)
+  }
+  final val rawFlagPickledOrder: Array[Long] = pickledListOrder.toArray
+
+  type FlagSet = Long
+
+  val NoFlags: FlagSet = 0L
+
+  implicit class FlagSetUtils(var flags: FlagSet) {
+    def hasFlag(mask: Long): Boolean = (flags & mask) != 0L
+
+    def hasFlag(mask: Int): Boolean = hasFlag(mask.toLong)
+
+    def setFlag(mask: Long): FlagSet = { flags |= mask; flags }
+
+    def resetFlag(mask: Long): FlagSet = { flags &= ~mask; flags }
+
+    def initFlags(mask: Long): FlagSet = { flags = mask; flags }
+  }
+
+}
+
+object Flags extends Flags

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
new file mode 100644
index 0000000..91201a1
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Harmonizer.scala
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.plans._
+import org.apache.spark.sql.catalyst.rules._
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+abstract class Harmonizer(conf: SQLConf)
+  extends RuleExecutor[ModularPlan] {
+
+  //  protected val fixedPoint = FixedPoint(conf.getConfString("spark.mv.harmonizer
+  // .maxIterations").toInt)
+  protected val fixedPoint = FixedPoint(conf.optimizerMaxIterations)
+
+  def batches: Seq[Batch] = {
+    Batch(
+      "Data Harmonizations", fixedPoint,
+      HarmonizeDimensionTable,
+      HarmonizeFactTable) :: Nil
+  }
+}
+
+/**
+ * An default Harmonizer
+ */
+object DefaultHarmonizer extends DefaultHarmonizer
+
+class DefaultHarmonizer extends Harmonizer(new SQLConf())
+
+object HarmonizeDimensionTable extends Rule[ModularPlan] with PredicateHelper {
+
+  def apply(plan: ModularPlan): ModularPlan = {
+    plan transform {
+      case s@Select(_, _, _, _, jedges, fact :: dims, _, _, _, _) if
+      jedges.forall(e => e.joinType == LeftOuter || e.joinType == Inner) &&
+      fact.isInstanceOf[ModularRelation] &&
+      dims.filterNot(_.isInstanceOf[modular.LeafNode]).nonEmpty &&
+      dims.forall(d => (d.isInstanceOf[ModularRelation] || HarmonizedRelation.canHarmonize(d))) => {
+        var tPullUpPredicates = Seq.empty[Expression]
+        val tChildren = fact :: dims.map {
+          case m: ModularRelation => m
+          case h@GroupBy(
+          _,
+          _,
+          _,
+          _,
+          s1@Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil, _),
+          NoFlags,
+          Nil, _) if (dim.isInstanceOf[ModularRelation]) => {
+            val rAliasMap = AttributeMap(h.outputList
+              .collect { case a: Alias => (a.child.asInstanceOf[Attribute], a.toAttribute) })
+            val pullUpPredicates = s1.predicateList
+              .map(replaceAlias(_, rAliasMap.asInstanceOf[AttributeMap[Expression]]))
+            if (pullUpPredicates.forall(cond => canEvaluate(cond, h))) {
+              tPullUpPredicates = tPullUpPredicates ++ pullUpPredicates
+              plans.modular.HarmonizedRelation(h.copy(child = s1.copy(predicateList = Nil)))
+            } else {
+              h
+            }
+          }
+          // case _ =>
+        }
+        if (tChildren.forall(_.isInstanceOf[modular.LeafNode])) {
+          s.copy(predicateList = s.predicateList ++ tPullUpPredicates, children = tChildren)
+        } else {
+          s
+        }
+      }
+      //        s.withNewChildren(fact :: dims.map { case m: modular.ModularRelation => m; case h
+      // => HarmonizedRelation(h) })}
+      //        s.copy(predicateList = predicateList ++ moveUpPredicates, children = tChildren)}
+      // fact :: dims.map { case m: modular.ModularRelation => m; case h => HarmonizedRelation(h)
+      // })}
+    }
+  }
+
+}
+
+object HarmonizeFactTable extends Rule[ModularPlan] with PredicateHelper with AggregatePushDown {
+
+  def apply(plan: ModularPlan): ModularPlan = {
+    plan transform {
+      case g@GroupBy(_, _, _, _,
+        s@Select(_, _, _, aliasm, jedges, fact :: dims, _, _, _, _), _, _, _)
+        if s.adjacencyList.keySet.size <= 1 &&
+           jedges.forall(e => e.joinType == Inner) && // !s.flags.hasFlag(DISTINCT) &&
+           fact.isInstanceOf[ModularRelation] &&
+           (fact :: dims).forall(_.isInstanceOf[modular.LeafNode]) &&
+           dims.nonEmpty => {
+        val selAliasMap = AttributeMap(s.outputList.collect {
+          case a: Alias if (a.child.isInstanceOf[Attribute]) => (a.toAttribute, a.child
+            .asInstanceOf[Attribute])
+        })
+        val aggTransMap = findPushThroughAggregates(
+          g.outputList,
+          selAliasMap,
+          fact.asInstanceOf[ModularRelation])
+
+        val constraintsAttributeSet = dims.flatMap(s.extractEvaluableConditions(_))
+          .map(_.references)
+          .foldLeft(AttributeSet.empty)(_ ++ _)
+        val groupingAttributeSet = g.predicateList.map(_.references)
+          .foldLeft(AttributeSet.empty)(_ ++ _)
+        if (aggTransMap.isEmpty ||
+            // TODO: the following condition is too pessimistic, more work needed using methods
+            // similar to those in trait
+            //      QueryPlanConstraints
+            !constraintsAttributeSet.subsetOf(groupingAttributeSet)) {
+          g
+        } else {
+          val starJExprs = dims.flatMap(dim => s.extractJoinConditions(fact, dim)).toSeq
+          val gJAttributes = starJExprs.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
+          val fExprs = s.extractEvaluableConditions(fact)
+          val gFAttributes = fExprs.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _)
+            .filter(fact.outputSet.contains(_))
+          val gGAttributes = g.predicateList.map(expr => expr.references)
+            .foldLeft(AttributeSet.empty)(_ ++ _).filter(fact.outputSet.contains(_))
+          val gAttributes = (gJAttributes ++ gFAttributes ++ gGAttributes).toSeq
+
+          val oAggregates = aggTransMap.map(_._2).flatMap(_._2).toSeq
+
+          val tAliasMap = (aliasm.get(0) match {
+            case Some(name) => Seq((0, name));
+            case _ => Seq.empty
+          }).toMap
+          val sOutput = (oAggregates.map(_.references).foldLeft(AttributeSet.empty)(_ ++ _) ++
+                         AttributeSet(gAttributes)).toSeq
+          val hFactSel = plans.modular
+            .Select(
+              sOutput,
+              fact.output,
+              Seq.empty,
+              tAliasMap,
+              Seq.empty,
+              fact :: Nil,
+              NoFlags,
+              Seq.empty,
+              Seq.empty)
+          val hFact = plans.modular
+            .GroupBy(
+              gAttributes ++ oAggregates,
+              sOutput,
+              gAttributes,
+              None,
+              hFactSel,
+              NoFlags,
+              Seq.empty)
+          val hFactName = s"gen_harmonized_${
+            fact.asInstanceOf[ModularRelation]
+              .databaseName
+          }_${ fact.asInstanceOf[ModularRelation].tableName }"
+          val hAliasMap = (aliasm - 0) + (0 -> hFactName)
+          val hInputList = gAttributes ++ oAggregates.map(_.toAttribute) ++
+                           dims.flatMap(_.asInstanceOf[modular.LeafNode].output).toSeq
+          // val hPredicateList = s.predicateList
+          val attrOutputList = s.outputList.filter(expr => (expr.isInstanceOf[Attribute]) ||
+                                                           (expr.isInstanceOf[Alias] &&
+                                                            expr.asInstanceOf[Alias].child
+                                                              .isInstanceOf[Attribute]))
+          val aggOutputList = aggTransMap.values.flatMap(t => t._2)
+            .map { ref =>
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
+            }
+          val hFactOutputSet = hFact.outputSet
+          // Update the outputlist qualifier
+          val hOutputList = (attrOutputList ++ aggOutputList).map {attr =>
+            attr.transform {
+              case ref: Attribute if hFactOutputSet.contains(ref) =>
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(hFactName))
+            }
+          }.asInstanceOf[Seq[NamedExpression]]
+
+          // Update the predicate qualifier
+          val hPredList = s.predicateList.map{ pred =>
+            pred.transform {
+              case ref: Attribute if hFactOutputSet.contains(ref) =>
+                AttributeReference(ref.name, ref.dataType)(
+                  exprId = ref.exprId,
+                  qualifier = Some(hFactName))
+            }
+          }
+          val hSel = s.copy(
+              outputList = hOutputList,
+              inputList = hInputList,
+              aliasMap = hAliasMap,
+              predicateList = hPredList,
+              children = hFact :: dims)
+          val gOutputList = g.outputList.zipWithIndex
+            .map { case (expr, index) =>
+              if (aggTransMap.keySet.contains(index)) {
+                aggTransMap(index)
+                  ._1
+              } else {
+                expr
+              }
+            }
+
+          val wip = g.copy(outputList = gOutputList, inputList = hInputList, child = hSel)
+          wip.transformExpressions {
+            case ref: Attribute if hFactOutputSet.contains(ref) =>
+              AttributeReference(ref.name, ref.dataType)(
+                exprId = ref.exprId,
+                qualifier = Some(hFactName))
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
new file mode 100644
index 0000000..c546c6e
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPatterns.scala
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions.{Expression, NamedExpression, PredicateHelper, _}
+import org.apache.spark.sql.catalyst.plans.logical.{Distinct, Limit, LogicalPlan, Window}
+
+import org.apache.carbondata.mv.plans.{Pattern, _}
+import org.apache.carbondata.mv.plans.modular.Flags._
+import org.apache.carbondata.mv.plans.util.{ExtractGroupByModule, ExtractSelectModule, ExtractSelectModuleForWindow, ExtractTableModule, ExtractUnionModule}
+
+object SimpleModularizer extends ModularPatterns {
+  def patterns: Seq[Pattern] = {
+    SelectModule ::
+    GroupByModule ::
+    UnionModule ::
+    DataSourceModule :: Nil
+  }
+
+  override protected def collectPlaceholders(plan: ModularPlan): Seq[(ModularPlan, LogicalPlan)] = {
+    plan.collect {
+      case placeholder@ModularizeLater(logicalPlan) => placeholder -> logicalPlan
+    }
+  }
+
+  override protected def prunePlans(plans: Iterator[ModularPlan]): Iterator[ModularPlan] = {
+    plans
+    //    plans.filter(_.collect { case n if n.subqueries.nonEmpty => n }.isEmpty)
+    // TODO: find out why the following stmt not working
+    //    plans.filter(_.find { case n if n.subqueries.nonEmpty => true }.isEmpty)
+  }
+
+  override protected def makeupAliasMappings(
+      plans: Iterator[ModularPlan]): Iterator[ModularPlan] = {
+    def makeup(plan: ModularPlan): ModularPlan = {
+      plan transform {
+        case g@GroupBy(_, _, _, _, s@Select(_, _, _, aliasmap, _, children, _, _, _, _), _, _, _) =>
+          val aq = AttributeSet(g.outputList).filter(_.qualifier.nonEmpty)
+          val makeupmap = children.zipWithIndex.flatMap {
+            case (child, i) =>
+              aq.find(child.outputSet.contains(_)).map(_.qualifier).flatten.map((i, _))
+          }.toMap
+          g.copy(child = s.copy(aliasMap = makeupmap ++ aliasmap))
+      }
+    }
+
+    plans.map(makeup)
+  }
+}
+
+abstract class ModularPattern extends GenericPattern[ModularPlan] {
+
+  override protected def modularizeLater(plan: LogicalPlan): ModularPlan = ModularizeLater(plan)
+}
+
+case class ModularizeLater(plan: LogicalPlan) extends LeafNode {
+  override def output: Seq[Attribute] = plan.output
+}
+
+abstract class ModularPatterns extends Modularizer[ModularPlan] {
+
+  //  self: MQOContext#SparkyModeler =>
+
+  object SelectModule extends Pattern with PredicateHelper {
+
+    private[this] def makeSelectModule(
+        output: Seq[NamedExpression],
+        input: Seq[Expression],
+        predicate: Seq[Expression],
+        aliasmap: Map[Int, String],
+        joinedge: Seq[JoinEdge],
+        flags: FlagSet,
+        children: Seq[ModularPlan],
+        flagSpec: Seq[Seq[Any]],
+        windowSpec: Seq[Seq[Any]]) = {
+      Seq(Select(
+        output,
+        input,
+        predicate,
+        aliasmap,
+        joinedge,
+        children,
+        flags,
+        flagSpec,
+        windowSpec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Distinct(
+          ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+          fspec1, wspec)) =>
+          val flags = flags1.setFlag(DISTINCT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), fspec1, wspec)
+
+        case Limit(
+          limitExpr,
+          Distinct(
+            ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children,
+              flags1, fspec1, wspec))) =>
+          val flags = flags1.setFlag(DISTINCT).setFlag(LIMIT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)
+
+        case Limit(
+          limitExpr,
+          ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+            fspec1, wspec)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags,
+            children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1, wspec)
+
+        case ExtractSelectModule(output, input, predicate, aliasmap, joinedge, children, flags1,
+        fspec1, wspec) =>
+          makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
+            children.map(modularizeLater), fspec1, wspec)
+
+        case Window(exprs, _, _,
+          ExtractSelectModuleForWindow(output, input, predicate, aliasmap, joinedge, children,
+            flags1, fspec1, wspec)) =>
+          val sel1 = makeSelectModule(output, input, predicate, aliasmap, joinedge, flags1,
+            children.map(modularizeLater), fspec1, wspec)
+          makeSelectModule(
+            output.map(_.toAttribute),
+            output.map(_.toAttribute),
+            Seq.empty,
+            Map.empty,
+            Seq.empty,
+            NoFlags,
+            sel1,
+            Seq.empty,
+            Seq(Seq(exprs)) ++ wspec)
+
+        case _ => Nil
+      }
+    }
+  }
+
+  object GroupByModule extends Pattern with PredicateHelper {
+
+    private[this] def makeGroupByModule(
+        output: Seq[NamedExpression],
+        input: Seq[Expression],
+        predicate: Seq[Expression],
+        flags: FlagSet,
+        alias: Option[String],
+        child: ModularPlan,
+        fspec: Seq[Seq[Any]]) = {
+      val groupby = Some(GroupBy(output, input, predicate, alias, child, flags, fspec))
+      groupby.map(Seq(_)).getOrElse(Nil)
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Limit(
+        limitExpr,
+        ExtractGroupByModule(output, input, predicate, alias, child, flags1, fspec1)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeGroupByModule(
+            output,
+            input,
+            predicate,
+            flags,
+            alias,
+            modularizeLater(child),
+            Seq(Seq(limitExpr)) ++ fspec1)
+        case ExtractGroupByModule(output, input, predicate, alias, child, flags1, fspec1) =>
+          makeGroupByModule(output, input, predicate, flags1, alias, modularizeLater(child), fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+  object UnionModule extends Pattern with PredicateHelper {
+
+    private[this] def makeUnionModule(
+        flags: FlagSet,
+        children: Seq[ModularPlan],
+        fspec: Seq[Seq[Any]]) = {
+      Seq(modular.Union(children, flags, fspec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case Distinct(ExtractUnionModule(children, flags1, fspec1)) =>
+          val flags = flags1.setFlag(DISTINCT)
+          makeUnionModule(flags, children.map(modularizeLater), fspec1)
+        case Limit(limitExpr, Distinct(ExtractUnionModule(children, flags1, fspec1))) =>
+          val flags = flags1.setFlag(DISTINCT).setFlag(LIMIT)
+          makeUnionModule(flags, children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1)
+        case Limit(limitExpr, ExtractUnionModule(children, flags1, fspec1)) =>
+          val flags = flags1.setFlag(LIMIT)
+          makeUnionModule(flags, children.map(modularizeLater), Seq(Seq(limitExpr)) ++ fspec1)
+        case ExtractUnionModule(children, flags1, fspec1) =>
+          makeUnionModule(flags1, children.map(modularizeLater), fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+  object DataSourceModule extends Pattern with Flags with PredicateHelper {
+
+    private[this] def makeDataSourceModule(
+        databaseName: String,
+        tableName: String,
+        output: Seq[NamedExpression],
+        flags: FlagSet,
+        fspec: Seq[Seq[Any]]) = {
+      Seq(ModularRelation(databaseName, tableName, output, flags, fspec))
+    }
+
+    def apply(plan: LogicalPlan): Seq[ModularPlan] = {
+      plan match {
+        case ExtractTableModule(databaseName, tableName, output, Nil, flags1, fspec1) =>
+          makeDataSourceModule(databaseName, tableName, output, flags1, fspec1)
+        case _ => Nil
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
new file mode 100644
index 0000000..c66df19
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlan.scala
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import scala.collection._
+import scala.collection.mutable.{HashMap, MultiMap}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.{JoinType, QueryPlan}
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.trees.TreeNode
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.util.{Printers, Signature, SQLBuilder}
+
+abstract class ModularPlan
+  extends QueryPlan[ModularPlan]
+    with AggregatePushDown
+    with Logging
+    with Serializable
+    with PredicateHelper with Printers {
+
+  /**
+   * the first two are to support sub-query expressions
+   */
+
+  lazy val resolved: Boolean = expressions.forall(_.resolved) && childrenResolved
+
+  def childrenResolved: Boolean = children.forall(_.resolved)
+
+  private var statsCache: Option[Statistics] = None
+
+  final def stats(spark: SparkSession, conf: SQLConf): Statistics = {
+    statsCache.getOrElse {
+      statsCache = Some(computeStats(spark, conf))
+      statsCache.get
+    }
+  }
+
+  final def invalidateStatsCache(): Unit = {
+    statsCache = None
+    children.foreach(_.invalidateStatsCache())
+  }
+
+  protected def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    //    spark.conf.set("spark.sql.cbo.enabled", true)
+    val sqlStmt = asOneLineSQL
+    val plan = spark.sql(sqlStmt).queryExecution.optimizedPlan
+    plan.stats(conf)
+  }
+
+  override def fastEquals(other: TreeNode[_]): Boolean = {
+    this.eq(other)
+  }
+
+  private var _rewritten: Boolean = false
+
+  /**
+   * Marks this plan as already rewritten.
+   */
+  private[mv] def setRewritten(): ModularPlan = {
+    _rewritten = true
+    children.foreach(_.setRewritten())
+    this
+  }
+
+  /**
+   * Returns true if this node and its children have already been gone through query rewrite.
+   * Note this this is only an optimization used to avoid rewriting trees that have already
+   * been rewritten, and can be reset by transformations.
+   */
+  def rewritten: Boolean = {
+    _rewritten
+  }
+
+  private var _skip: Boolean = false
+
+  private[mv] def setSkip(): ModularPlan = {
+    _skip = true
+    children.foreach(_.setSkip())
+    this
+  }
+
+  private[mv] def resetSkip(): ModularPlan = {
+    _skip = false
+    children.foreach(_.resetSkip())
+    this
+  }
+
+  def skip: Boolean = _skip
+
+  def isSPJGH: Boolean = {
+    this match {
+      case modular.Select(_, _, _, _, _,
+        Seq(modular.GroupBy(_, _, _, _,
+        sel_c2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)), _, _, _, _)
+        if sel_c2.children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case modular.GroupBy(_, _, _, _, sel_c2@modular.Select(_, _, _, _, _, _, _, _, _, _), _, _, _)
+        if sel_c2.children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case modular.Select(_, _, _, _, _, children, _, _, _, _)
+        if children.forall(_.isInstanceOf[modular.LeafNode]) => true
+
+      case _ => false
+    }
+  }
+
+  def signature: Option[Signature] = ModularPlanSignatureGenerator.generate(this)
+
+  def createMutableAdjacencyList(
+      edges: Seq[JoinEdge]
+  ): mutable.HashMap[Int, mutable.Set[(Int, JoinType)]] with mutable.MultiMap[Int, (Int, JoinType)]
+  = {
+    val mm = new HashMap[Int, mutable.Set[(Int, JoinType)]] with MultiMap[Int, (Int, JoinType)]
+    for (edge <- edges) { mm.addBinding(edge.left, (edge.right, edge.joinType)) }
+    mm
+  }
+
+  def createImmutableAdjacencyList(edges: Seq[JoinEdge]): Predef.Map[Int, Seq[(Int, JoinType)]] = {
+    edges.groupBy { _.left }.map { case (k, v) => (k, v.map(e => (e.right, e.joinType))) }
+  }
+
+  def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty
+
+  def extractJoinConditions(left: ModularPlan, right: ModularPlan): Seq[Expression] = Seq.empty
+
+  def extractRightEvaluableConditions(left: ModularPlan, right: ModularPlan): Seq[Expression] =
+    Seq.empty
+
+  def extractEvaluableConditions(plan: ModularPlan): Seq[Expression] = Seq.empty
+
+  def asCompactSQL: String = asCompactString(new SQLBuilder(this).fragmentExtract)
+
+  def asOneLineSQL: String = asOneLineString(new SQLBuilder(this).fragmentExtract)
+
+  // for plan without sub-query expression only
+  def asOneLineSQL(subqueryPrefix: String): String = {
+    asOneLineString(new SQLBuilder(
+      this,
+      subqueryPrefix).fragmentExtract)
+  }
+
+  /**
+   * Returns a plan where a best effort attempt has been made to transform `this` in a way
+   * that preserves the result but replaces harmonized dimension table with HarmonizedRelation
+   * and fact table with sub-plan that pre-aggregates the table before join with dimension table
+   *
+   * Some nodes should overwrite this to provide proper harmonization logic.
+   */
+  lazy val harmonized: ModularPlan = DefaultHarmonizer.execute(preHarmonized)
+
+  /**
+   * Do some simple transformation on this plan before harmonizing. Implementations can override
+   * this method to provide customized harmonize logic without rewriting the whole logic.
+   *
+   * We assume queries need to be harmonized are of the form:
+   *
+   * FACT (left) join (harmonized) DIM1 (left) join (harmonized) DIM2 ...
+   *
+   * For queries of not this form, customize this method for them to conform this form.
+   */
+  protected def preHarmonized: ModularPlan = {
+    this
+  }
+}
+
+object ModularPlan extends PredicateHelper {
+
+}
+
+abstract class LeafNode extends ModularPlan {
+  override def children: Seq[ModularPlan] = Nil
+}
+
+abstract class UnaryNode extends ModularPlan {
+  def child: ModularPlan
+
+  override def children: Seq[ModularPlan] = child :: Nil
+}
+
+abstract class BinaryNode extends ModularPlan {
+  def left: ModularPlan
+
+  def right: ModularPlan
+
+  override def children: Seq[ModularPlan] = Seq(left, right)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
new file mode 100644
index 0000000..86cae36
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularPlanSignatureGenerator.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.util.{Signature, SignatureGenerator, SignatureRule}
+
+object ModularPlanSignatureGenerator extends SignatureGenerator[ModularPlan] {
+  lazy val rule: SignatureRule[ModularPlan] = ModularPlanRule
+
+  override def generate(plan: ModularPlan): Option[Signature] = {
+    if (plan.isSPJGH) {
+      super.generate(plan)
+    } else {
+      None
+    }
+  }
+}
+
+object ModularPlanRule extends SignatureRule[ModularPlan] {
+
+  def apply(plan: ModularPlan, childSignatures: Seq[Option[Signature]]): Option[Signature] = {
+
+    plan match {
+      case modular.Select(_, _, _, _, _, _, _, _, _, _) =>
+        if (childSignatures.map { _.getOrElse(Signature()).groupby }.forall(x => !x)) {
+          Some(Signature(
+            groupby = false,
+            childSignatures.flatMap { _.getOrElse(Signature()).datasets.toSeq }.toSet))
+        } else if (childSignatures.length == 1 &&
+                   childSignatures(0).getOrElse(Signature()).groupby) {
+          childSignatures(0)
+        } else {
+          None
+        }
+      case modular.GroupBy(_, _, _, _, _, _, _, _) =>
+        if (childSignatures.length == 1 && !childSignatures(0).getOrElse(Signature()).groupby) {
+          Some(Signature(groupby = true, childSignatures(0).getOrElse(Signature()).datasets))
+        } else {
+          None
+        }
+      case HarmonizedRelation(source) =>
+        source.signature match {
+          case Some(s) =>
+            Some(Signature(groupby = false, s.datasets))
+          case _ =>
+            None
+        }
+      case modular.ModularRelation(dbname, tblname, _, _, _) =>
+        if (dbname != null && tblname != null) {
+          Some(Signature(groupby = false, Set(Seq(dbname, tblname).mkString("."))))
+        } else {
+          Some(Signature(groupby = false, Set(plan.toString())))
+        }
+      case _ => None
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
new file mode 100644
index 0000000..dec0c48
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/ModularRelation.scala
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.plans.JoinType
+import org.apache.spark.sql.catalyst.plans.logical
+import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.internal.SQLConf
+
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+object ModularRelation {
+  def apply(outputList: NamedExpression*): ModularRelation = {
+    new ModularRelation(
+      "test",
+      "test",
+      outputList,
+      NoFlags,
+      Seq.empty)
+  }
+}
+
+case class ModularRelation(databaseName: String,
+    tableName: String,
+    outputList: Seq[NamedExpression],
+    flags: FlagSet,
+    rest: Seq[Seq[Any]]) extends LeafNode {
+  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
+    val stats = plan.stats(conf)
+    val output = outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq(0)
+    val attributeStats = AttributeMap(stats.attributeStats.iterator
+      .map { pair => (rewrites(pair._1), pair._2) }.toSeq)
+    Statistics(stats.sizeInBytes, stats.rowCount, attributeStats, stats.hints)
+  }
+
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+
+  override def adjacencyList: Map[Int, Seq[(Int, JoinType)]] = Map.empty
+
+  override def equals(that: Any): Boolean = {
+    that match {
+      case that: ModularRelation =>
+        if ((databaseName != null && tableName != null && databaseName == that.databaseName &&
+             tableName == that.tableName) ||
+            (databaseName == null && tableName == null && that.databaseName == null &&
+             that.tableName == null && output.toString == (that.output).toString)) {
+          true
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+}
+
+object HarmonizedRelation {
+  def canHarmonize(source: ModularPlan): Boolean = {
+    source match {
+      case g@GroupBy(
+      _,
+      _,
+      _,
+      _,
+      Select(_, _, _, _, _, dim :: Nil, NoFlags, Nil, Nil, _),
+      NoFlags,
+      Nil, _) if (dim.isInstanceOf[ModularRelation]) =>
+        if (g.outputList
+          .forall(col => col.isInstanceOf[AttributeReference] ||
+                         (col.isInstanceOf[Alias] &&
+                          col.asInstanceOf[Alias].child.isInstanceOf[AttributeReference]))) {
+          true
+        } else {
+          false
+        }
+      case _ => false
+    }
+  }
+}
+
+// support harmonization for dimension table
+case class HarmonizedRelation(source: ModularPlan) extends LeafNode {
+  require(HarmonizedRelation.canHarmonize(source), "invalid plan for harmonized relation")
+  lazy val tableName = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
+    .tableName
+  lazy val databaseName = source.asInstanceOf[GroupBy].child.children(0)
+    .asInstanceOf[ModularRelation].databaseName
+
+  //  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = source.stats
+  // (spark, conf)
+  override def computeStats(spark: SparkSession, conf: SQLConf): Statistics = {
+    val plan = spark.table(s"${ databaseName }.${ tableName }").queryExecution.optimizedPlan
+    val stats = plan.stats(conf)
+    val output = source.asInstanceOf[GroupBy].child.children(0).asInstanceOf[ModularRelation]
+      .outputList.map(_.toAttribute)
+    val mapSeq = plan.collect { case n: logical.LeafNode => n }.map {
+      table => AttributeMap(table.output.zip(output))
+    }
+    val rewrites = mapSeq(0)
+    val aliasMap = AttributeMap(
+      source.asInstanceOf[GroupBy].outputList.collect {
+        case a: Alias if (a.child.isInstanceOf[Attribute]) => (a.child.asInstanceOf[Attribute], a
+          .toAttribute)
+      })
+    val aStatsIterator = stats.attributeStats.iterator.map { pair => (rewrites(pair._1), pair._2) }
+    val attributeStats = AttributeMap(aStatsIterator
+      .map(pair => ((aliasMap.get(pair._1)).getOrElse(pair._1), pair._2)).toSeq)
+
+    Statistics(stats.sizeInBytes, None, attributeStats, stats.hints)
+  }
+
+  override def output: Seq[Attribute] = source.output
+
+  // two harmonized modular relations are equal only if orders of output columns of
+  // their source plans are exactly the same
+  override def equals(that: Any): Boolean = {
+    that match {
+      case that: HarmonizedRelation => source.sameResult(that.source)
+      case _ => false
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
new file mode 100644
index 0000000..d255359
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/Modularizer.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.expressions.{Exists, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.trees.TreeNode
+
+import org.apache.carbondata.mv.expressions.modular._
+import org.apache.carbondata.mv.plans._
+
+abstract class GenericPattern[TreeType <: TreeNode[TreeType]] extends Logging {
+  protected def modularizeLater(plan: LogicalPlan): TreeType
+
+  def apply(plan: LogicalPlan): Seq[TreeType]
+}
+
+abstract class Modularizer[TreeType <: TreeNode[TreeType]] {
+  def patterns: Seq[GenericPattern[TreeType]]
+
+  // protected def modularizeLater(plan: LogicalPlan) = this.modularize(plan).next()
+
+  def modularize(plan: LogicalPlan): Iterator[TreeType] = {
+    val replaced = plan.transformAllExpressions {
+      case s: ScalarSubquery =>
+        if (s.children.isEmpty) {
+          ScalarModularSubquery(
+            modularize(s.plan).next.asInstanceOf[ModularPlan],
+            s.children,
+            s.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $s doesn't canonicalized")
+        }
+      case l: ListQuery =>
+        if (l.children.isEmpty) {
+          ModularListQuery(modularize(l.plan).next.asInstanceOf[ModularPlan], l.children, l.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $l doesn't canonicalized")
+        }
+      case e: Exists =>
+        if (e.children.isEmpty) {
+          ModularExists(modularize(e.plan).next.asInstanceOf[ModularPlan], e.children, e.exprId)
+        } else {
+          throw new UnsupportedOperationException(s"Expression $e doesn't canonicalized")
+        }
+      case o => o
+    }
+    //    val replaced = plan
+    val mplans = modularizeCore(replaced)
+    makeupAliasMappings(mplans)
+  }
+
+  private def modularizeCore(plan: LogicalPlan): Iterator[TreeType] = {
+    // Collect modular plan candidates.
+    val candidates = patterns.iterator.flatMap(_ (plan))
+
+    // The candidates may contain placeholders marked as [[modularizeLater]],
+    // so try to replace them by their child plans.
+    val plans = candidates.flatMap { candidate =>
+      val placeholders = collectPlaceholders(candidate)
+
+      if (placeholders.isEmpty) {
+        // Take the candidate as is because it does not contain placeholders.
+        Iterator(candidate)
+      } else {
+        // Plan the logical plan marked as [[modularizeLater]] and replace the placeholders.
+        placeholders.iterator.foldLeft(Iterator(candidate)) {
+          case (candidatesWithPlaceholders, (placeholder, logicalPlan)) =>
+            // Modularize the logical plan for the placeholder
+            val childPlans = this.modularizeCore(logicalPlan)
+
+            candidatesWithPlaceholders.flatMap { candidateWithPlaceholder =>
+              childPlans.map { childPlan =>
+                // Replace the placeholder by the child plan
+                candidateWithPlaceholder.transformUp {
+                  case p if p == placeholder => childPlan
+                }
+              }
+            }
+        }
+      }
+    }
+
+    val pruned = prunePlans(plans)
+    // val iter = patterns.view.flatMap(_(plan)).toIterator
+    supports(
+      pruned.hasNext,
+      s"Modular plan not supported (e.g. has subquery expression) for \n$plan")
+    //    makeupAliasMappings(pruned)
+    pruned
+  }
+
+  /** Collects placeholders marked as [[modularizeLater]] by pattern and its [[LogicalPlan]]s */
+  protected def collectPlaceholders(plan: TreeType): Seq[(TreeType, LogicalPlan)]
+
+  /** Prunes bad plans to prevent combinatorial explosion. */
+  protected def prunePlans(plans: Iterator[TreeType]): Iterator[TreeType]
+
+  protected def makeupAliasMappings(plans: Iterator[TreeType]): Iterator[TreeType]
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
new file mode 100644
index 0000000..0a76da1
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/basicOperators.scala
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.sql.catalyst.expressions.{Attribute, _}
+import org.apache.spark.sql.catalyst.plans.JoinType
+
+import org.apache.carbondata.mv.plans._
+import org.apache.carbondata.mv.plans.modular.Flags._
+
+private[mv] trait Matchable extends ModularPlan {
+  def outputList: Seq[NamedExpression]
+
+  def predicateList: Seq[Expression]
+}
+
+case class GroupBy(
+    outputList: Seq[NamedExpression],
+    inputList: Seq[Expression],
+    predicateList: Seq[Expression],
+    alias: Option[String],
+    child: ModularPlan,
+    flags: FlagSet,
+    flagSpec: Seq[Seq[Any]],
+    dataMapTableRelation: Option[ModularPlan] = None) extends UnaryNode with Matchable {
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+}
+
+case class Select(
+    outputList: Seq[NamedExpression],
+    inputList: Seq[Expression],
+    predicateList: Seq[Expression],
+    aliasMap: Map[Int, String],
+    joinEdges: Seq[JoinEdge],
+    children: Seq[ModularPlan],
+    flags: FlagSet,
+    flagSpec: Seq[Seq[Any]],
+    windowSpec: Seq[Seq[Any]],
+    dataMapTableRelation: Option[ModularPlan] = None) extends ModularPlan with Matchable {
+  override def output: Seq[Attribute] = outputList.map(_.toAttribute)
+
+  override def adjacencyList: scala.collection.immutable.Map[Int, Seq[(Int, JoinType)]] = {
+    joinEdges.groupBy { _.left }.map { case (k, v) => (k, v.map(e => (e.right, e.joinType))) }
+  }
+
+  override def extractJoinConditions(
+      left: ModularPlan, right: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => p.references.intersect(left.outputSet).nonEmpty &&
+                              p.references.intersect(right.outputSet).nonEmpty &&
+                              p.references.subsetOf(left.outputSet ++ right.outputSet))
+  }
+
+  override def extractRightEvaluableConditions(
+      left: ModularPlan, right: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => p.references.subsetOf(left.outputSet ++ right.outputSet) &&
+                              p.references.intersect(right.outputSet).nonEmpty)
+  }
+
+  override def extractEvaluableConditions(plan: ModularPlan): Seq[Expression] = {
+    predicateList.filter(p => canEvaluate(p, plan))
+  }
+}
+
+case class Union(children: Seq[ModularPlan], flags: FlagSet, flagSpec: Seq[Seq[Any]])
+  extends ModularPlan {
+  override def output: Seq[Attribute] = children.head.output
+}
+
+case object OneRowTable extends LeafNode {
+  override def output: Seq[Attribute] = Nil
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
new file mode 100644
index 0000000..da6bc15
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/queryGraph.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv.plans.modular
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.plans.JoinType
+
+@DeveloperApi
+case class JoinEdge(left: Int, right: Int, joinType: JoinType)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ffddba70/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
----------------------------------------------------------------------
diff --git a/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
new file mode 100644
index 0000000..8c799fe
--- /dev/null
+++ b/datamap/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/package.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.mv
+
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.catalyst.expressions.{AttributeSet, Expression, PredicateHelper}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+
+import org.apache.carbondata.mv.plans.modular.ModularPlan
+import org.apache.carbondata.mv.plans.util.{CheckSPJG, LogicalPlanSignatureGenerator, Signature}
+
+/**
+ * A a collection of common abstractions for query plans as well as
+ * a base semantic plan representation.
+ */
+package object plans {
+  @DeveloperApi
+  type Pattern = org.apache.carbondata.mv.plans.modular.ModularPattern
+
+  implicit class LogicalPlanUtils(val plan: LogicalPlan) {
+    lazy val isSPJG: Boolean = CheckSPJG.isSPJG(plan)
+    lazy val signature: Option[Signature] = LogicalPlanSignatureGenerator.generate(plan)
+  }
+
+  implicit class MorePredicateHelper(p: PredicateHelper) {
+    def canEvaluate(expr: Expression, plan: ModularPlan): Boolean = {
+      expr.references.subsetOf(plan.outputSet)
+    }
+
+    def canEvaluate(expr: Expression, exprList: Seq[Expression]): Boolean = {
+      expr.references.subsetOf(AttributeSet(exprList))
+    }
+  }
+
+  def supports(supported: Boolean, message: Any) {
+    if (!supported) {
+      throw new UnsupportedOperationException(s"unsupported operation: $message")
+    }
+  }
+}