You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/02/25 11:48:56 UTC

[carbondata] branch master updated: [CARBONDATA-3705] Support create and load MV for spark datasource table

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4e8f819  [CARBONDATA-3705] Support create and load MV for spark datasource table
4e8f819 is described below

commit 4e8f819ff5e836f99315bbb94603526af66ccdd4
Author: Jacky Li <ja...@qq.com>
AuthorDate: Tue Feb 18 21:01:29 2020 +0800

    [CARBONDATA-3705] Support create and load MV for spark datasource table
    
    Why is this PR needed?
    Materialized View is a feature built on top of Spark, it should support not only carbondata format but also other formats.
    
    What changes were proposed in this PR?
    Added an API in CarbonMetaStore to lookup any relation, not just carbon relation
    When creating MV, use newly added lookup relation to get all parent table relations. Use CatalogTable instead of CarbonTable whenever possible
    Skip the segment handling when loading MV
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3625
---
 .../carbondata/core/datamap/DataMapProvider.java   |   7 +-
 .../metadata/schema/table/RelationIdentifier.java  |  29 ++-
 .../scala/org/apache/spark/sql/CarbonEnv.scala     |  14 ++
 .../spark/sql/hive/CarbonFileMetastore.scala       |  30 ++-
 .../apache/spark/sql/hive/CarbonMetaStore.scala    |   5 +-
 .../sql/parser/CarbonSparkSqlParserUtil.scala      |   1 +
 .../apache/carbondata/mv/extension/MVHelper.scala  | 211 +++++++++++----------
 .../apache/carbondata/mv/extension/MVUtil.scala    | 122 +++++-------
 .../carbondata/mv/rewrite/MVCreateTestCase.scala   | 127 +++++++++++++
 .../mv/rewrite/TestAllOperationsOnMV.scala         |   7 +-
 10 files changed, 365 insertions(+), 188 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index c38411a..6df7e56 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException;
@@ -189,7 +190,11 @@ public abstract class DataMapProvider {
             return false;
           }
         } else {
-          List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables();
+          // set segment mapping only for carbondata table
+          List<RelationIdentifier> relationIdentifiers = dataMapSchema.getParentTables()
+              .stream()
+              .filter(RelationIdentifier::isCarbonDataTable)
+              .collect(Collectors.toList());
           for (RelationIdentifier relationIdentifier : relationIdentifiers) {
             List<String> mainTableSegmentList =
                 DataMapUtil.getMainTableValidSegmentList(relationIdentifier);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
index ff4b7a0..9f4a7e9 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/RelationIdentifier.java
@@ -21,6 +21,7 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Objects;
 
 /**
  * class to maintain the relation between parent and child
@@ -35,6 +36,8 @@ public class RelationIdentifier implements Serializable, Writable {
 
   private String tablePath = "";
 
+  private String provider = "carbondata";
+
   public RelationIdentifier(String databaseName, String tableName, String tableId) {
     this.databaseName = databaseName;
     this.tableName = tableName;
@@ -61,12 +64,25 @@ public class RelationIdentifier implements Serializable, Writable {
     this.tablePath = tablePath;
   }
 
+  public String getProvider() {
+    return provider;
+  }
+
+  public void setProvider(String provider) {
+    this.provider = provider;
+  }
+
+  public boolean isCarbonDataTable() {
+    return provider.equalsIgnoreCase("carbondata");
+  }
+
   @Override
   public void write(DataOutput out) throws IOException {
     out.writeUTF(databaseName);
     out.writeUTF(tableName);
     out.writeUTF(tableId);
     out.writeUTF(tablePath);
+    out.writeUTF(provider);
   }
 
   @Override
@@ -75,6 +91,7 @@ public class RelationIdentifier implements Serializable, Writable {
     this.tableName = in.readUTF();
     this.tableId = in.readUTF();
     this.tablePath = in.readUTF();
+    this.provider = in.readUTF();
   }
 
   @Override
@@ -84,15 +101,16 @@ public class RelationIdentifier implements Serializable, Writable {
 
     RelationIdentifier that = (RelationIdentifier) o;
 
-    if (databaseName != null ?
-        !databaseName.equals(that.databaseName) :
-        that.databaseName != null) {
+    if (!Objects.equals(databaseName, that.databaseName)) {
+      return false;
+    }
+    if (!Objects.equals(tableName, that.tableName)) {
       return false;
     }
-    if (tableName != null ? !tableName.equals(that.tableName) : that.tableName != null) {
+    if (!Objects.equals(provider, that.provider)) {
       return false;
     }
-    return tableId != null ? tableId.equals(that.tableId) : that.tableId == null;
+    return Objects.equals(tableId, that.tableId);
   }
 
   @Override
@@ -100,6 +118,7 @@ public class RelationIdentifier implements Serializable, Writable {
     int result = databaseName != null ? databaseName.hashCode() : 0;
     result = 31 * result + (tableName != null ? tableName.hashCode() : 0);
     result = 31 * result + (tableId != null ? tableId.hashCode() : 0);
+    result = 31 * result + (provider != null ? provider.hashCode() : 0);
     return result;
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index 0f52fb7..92782e9 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -288,6 +288,20 @@ object CarbonEnv {
   }
 
   /**
+   * Return any kinds of table including non-carbon table
+   */
+  def getAnyTable(
+      databaseNameOp: Option[String],
+      tableName: String)
+    (sparkSession: SparkSession): CarbonTable = {
+    val catalog = getInstance(sparkSession).carbonMetaStore
+    catalog
+      .lookupAnyRelation(databaseNameOp, tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+  }
+
+  /**
    *
    * @return true is the relation was changes and was removed from cache. false is there is no
    *         change in the relation.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 55bf43b..df05499 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,10 +19,8 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.util.concurrent.ConcurrentHashMap
-import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, CarbonSource, EnvHelper, SparkSession}
 import org.apache.spark.sql.CarbonExpressions.{CarbonSubqueryAlias => SubqueryAlias}
@@ -31,6 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.sql.sources.BaseRelation
 import org.apache.spark.util.CarbonReflectionUtils
 
@@ -173,12 +172,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
   def lookupRelation(dbName: Option[String], tableName: String)
-    (sparkSession: SparkSession): LogicalPlan = {
+    (sparkSession: SparkSession): CarbonRelation = {
     lookupRelation(TableIdentifier(tableName, dbName))(sparkSession)
   }
 
   override def lookupRelation(tableIdentifier: TableIdentifier)
-    (sparkSession: SparkSession): LogicalPlan = {
+    (sparkSession: SparkSession): CarbonRelation = {
     val database = tableIdentifier.database.getOrElse(
       sparkSession.catalog.currentDatabase)
     val relation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
@@ -218,6 +217,29 @@ class CarbonFileMetastore extends CarbonMetaStore {
     relation
   }
 
+  override def lookupAnyRelation(
+      dbName: Option[String], tableName: String)
+    (sparkSession: SparkSession): LogicalPlan = {
+    val tableIdentifier = new TableIdentifier(tableName, dbName)
+    val rawRelation = sparkSession.sessionState.catalog.lookupRelation(tableIdentifier)
+    rawRelation match {
+      case SubqueryAlias(_, c)
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+            c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+            c.getClass.getName.equals(
+              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) =>
+        val catalogTable =
+          CarbonReflectionUtils.getFieldOfCatalogTable("tableMeta", c).asInstanceOf[CatalogTable]
+        val tableInfo = CarbonSparkSqlParserUtil.buildTableInfoFromCatalogTable(
+          catalogTable, false, sparkSession)
+        val carbonTable = CarbonTable.buildFromTableInfo(tableInfo)
+        CarbonRelation(carbonTable.getDatabaseName, carbonTable.getTableName, carbonTable)
+      case _ =>
+        throw new NoSuchTableException(
+          sparkSession.sessionState.catalog.getCurrentDatabase, tableIdentifier.table)
+    }
+  }
+
   def tableExists(
       table: String,
       databaseOp: Option[String] = None)(sparkSession: SparkSession): Boolean = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
index 6003f81..db71e53 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetaStore.scala
@@ -35,9 +35,12 @@ import org.apache.carbondata.format.SchemaEvolutionEntry
 trait CarbonMetaStore {
 
   def lookupRelation(dbName: Option[String], tableName: String)
-    (sparkSession: SparkSession): LogicalPlan
+    (sparkSession: SparkSession): CarbonRelation
 
   def lookupRelation(tableIdentifier: TableIdentifier)
+    (sparkSession: SparkSession): CarbonRelation
+
+  def lookupAnyRelation(dbName: Option[String], tableName: String)
     (sparkSession: SparkSession): LogicalPlan
 
   /**
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
index 02c1a5b..2d128a4 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParserUtil.scala
@@ -460,6 +460,7 @@ object CarbonSparkSqlParserUtil {
       )
       TableNewProcessor(tableModel)
     }
+    tableInfo.setTablePath(identifier.getTablePath)
     tableInfo.setTransactionalTable(isTransactionalTable)
     tableInfo
   }
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
index 6f0dcbb..b23f350 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVHelper.scala
@@ -22,11 +22,12 @@ import java.util
 import scala.collection.JavaConverters._
 import scala.collection.mutable
 
-import org.apache.spark.sql.{CarbonEnv, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSource, SparkSession}
 import org.apache.spark.sql.catalyst.{CarbonParserUtil, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, HiveTableRelation}
 import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Cast, Coalesce, Expression, Literal, NamedExpression, ScalaUDF}
 import org.apache.spark.sql.catalyst.expressions.aggregate.{AggregateExpression, Average}
+import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Join, Limit, LogicalPlan, Project}
 import org.apache.spark.sql.execution.command.{Field, PartitionerField, TableModel, TableNewProcessor}
 import org.apache.spark.sql.execution.command.table.{CarbonCreateTableCommand, CarbonDropTableCommand}
@@ -45,7 +46,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.datamap.DataMapManager
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan}
 import org.apache.carbondata.mv.plans.util.SQLBuilder
-import org.apache.carbondata.mv.rewrite.{MVUdf, SummaryDatasetCatalog, Utils}
+import org.apache.carbondata.mv.rewrite.{MVUdf, SummaryDatasetCatalog}
 import org.apache.carbondata.mv.timeseries.{TimeSeriesFunction, TimeSeriesUtil}
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -54,16 +55,12 @@ case class MVField(
     columnTableRelationList: Seq[ColumnTableRelation]) {
 }
 
-case class ColumnTableRelation(
-    parentColumnName: String,
-    parentColumnId: String,
-    parentTableName: String,
-    parentDatabaseName: String,
-    parentTableId: String) {
-}
+case class ColumnTableRelation(columnName: String, tableName: String, databaseName: String)
+
+case class Relation(output: Seq[AttributeReference], catalogTable: Option[CatalogTable])
 
 /**
- * Utility for MV datamap operations.
+ * Utility for MV operations.
  */
 object MVHelper {
 
@@ -72,42 +69,41 @@ object MVHelper {
       dataMapSchema: DataMapSchema,
       queryString: String,
       ifNotExistsSet: Boolean = false): Unit = {
-    val dmProperties = dataMapSchema.getProperties.asScala
-    if (dmProperties.contains("streaming") && dmProperties("streaming").equalsIgnoreCase("true")) {
+    val properties = dataMapSchema.getProperties.asScala
+    if (properties.contains("streaming") && properties("streaming").equalsIgnoreCase("true")) {
       throw new MalformedCarbonCommandException(
         s"Materialized view does not support streaming"
       )
     }
     val mvUtil = new MVUtil
-    mvUtil.validateDMProperty(dmProperties)
-    val logicalPlan = dropDummyFunc(
-      MVParser.getMVPlan(queryString, sparkSession))
+    mvUtil.validateDMProperty(properties)
+    val queryPlan = dropDummyFunc(MVParser.getMVPlan(queryString, sparkSession))
     // if there is limit in MV ctas query string, throw exception, as its not a valid usecase
-    logicalPlan match {
+    queryPlan match {
       case Limit(_, _) =>
         throw new MalformedCarbonCommandException("Materialized view does not support the query " +
                                                   "with limit")
       case _ =>
     }
-    val selectTables = getTables(logicalPlan)
-    if (selectTables.isEmpty) {
+    val mainTables = getCatalogTables(queryPlan)
+    if (mainTables.isEmpty) {
       throw new MalformedCarbonCommandException(
         s"Non-Carbon table does not support creating MV datamap")
     }
-    val modularPlan = validateMVQuery(sparkSession, logicalPlan)
+    val modularPlan = validateMVQuery(sparkSession, queryPlan)
     val updatedQueryWithDb = modularPlan.asCompactSQL
-    val (timeSeriesColumn, granularity): (String, String) = validateMVTimeSeriesQuery(
-      logicalPlan,
-      dataMapSchema)
-    val fullRebuild = isFullReload(logicalPlan)
+    val (timeSeriesColumn, granularity) = validateMVTimeSeriesQuery(queryPlan, dataMapSchema)
+    val isQueryNeedFullRebuild = checkIsQueryNeedFullRebuild(queryPlan)
+    val hasNonCarbonProvider = checkMainTableHasNonCarbonSource(mainTables)
+    val isNeedFullRebuild = isQueryNeedFullRebuild || hasNonCarbonProvider
     var counter = 0
     // the ctas query can have duplicate columns, so we should take distinct and create fields,
     // so that it won't fail during create mv table
-    val fields = logicalPlan.output.map { attr =>
+    val fields = queryPlan.output.map { attr =>
       if (attr.dataType.isInstanceOf[ArrayType] || attr.dataType.isInstanceOf[StructType] ||
           attr.dataType.isInstanceOf[MapType]) {
         throw new UnsupportedOperationException(
-          s"MV datamap is not supported for complex datatype columns and complex datatype return " +
+          s"MV is not supported for complex datatype columns and complex datatype return " +
           s"types of function :" + attr.name)
       }
       val name = updateColumnName(attr, counter)
@@ -133,30 +129,23 @@ object MVHelper {
 
     val tableProperties = mutable.Map[String, String]()
     val parentTables = new util.ArrayList[String]()
-    val parentTablesList = new util.ArrayList[CarbonTable](selectTables.size)
-    selectTables.foreach { selectTable =>
-      val mainCarbonTable = try {
-        Some(CarbonEnv.getCarbonTable(selectTable.identifier.database,
-          selectTable.identifier.table)(sparkSession))
-      } catch {
-        // Exception handling if it's not a CarbonTable
-        case ex: Exception =>
-          throw new MalformedCarbonCommandException(
-            s"Non-Carbon table does not support creating MV")
-      }
-      if (!mainCarbonTable.get.getTableInfo.isTransactionalTable) {
+    val parentTablesList = new util.ArrayList[CarbonTable](mainTables.size)
+    mainTables.foreach { mainTable =>
+      val table = CarbonEnv.getAnyTable(
+        mainTable.identifier.database, mainTable.identifier.table)(sparkSession)
+      if (!table.getTableInfo.isTransactionalTable) {
         throw new MalformedCarbonCommandException("Unsupported operation on NonTransactional table")
       }
-      if (mainCarbonTable.get.isChildTableForMV) {
+      if (table.isChildTableForMV) {
         throw new MalformedCarbonCommandException(
-          "Cannot create MV on child table " + mainCarbonTable.get.getTableUniqueName)
+          "Cannot create MV on child table " + table.getTableUniqueName)
       }
-      parentTables.add(mainCarbonTable.get.getTableName)
-      if (!mainCarbonTable.isEmpty && mainCarbonTable.get.isStreamingSink) {
+      if (table.isStreamingSink) {
         throw new MalformedCarbonCommandException(
-          s"Streaming table does not support creating materialized view")
+        s"Streaming table does not support creating materialized view")
       }
-      parentTablesList.add(mainCarbonTable.get)
+      parentTables.add(table.getTableName)
+      parentTablesList.add(table)
     }
 
     // Check if load is in progress in any of the parent table mapped to the datamap
@@ -173,12 +162,12 @@ object MVHelper {
     tableProperties.put(CarbonCommonConstants.PARENT_TABLES, parentTables.asScala.mkString(","))
 
     val finalModularPlan = new SQLBuilder(modularPlan).SQLizer.execute(modularPlan)
-    val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(finalModularPlan,
-      getLogicalRelation(logicalPlan))
+    val fieldRelationMap = mvUtil.getFieldsAndDataMapFieldsFromPlan(
+      finalModularPlan, getRelation(queryPlan))
     // If dataMap is mapped to single main table, then inherit table properties from main table,
     // else, will use default table properties. If DMProperties contains table properties, then
     // table properties of datamap table will be updated
-    if (parentTablesList.size() == 1) {
+    if (parentTablesList.size() == 1 && CarbonSource.isCarbonDataSource(mainTables.head)) {
       inheritTablePropertiesFromMainTable(
         parentTablesList.get(0),
         fields,
@@ -207,11 +196,11 @@ object MVHelper {
         }
       }
     }
-    dmProperties.foreach(t => tableProperties.put(t._1, t._2))
-    val usePartitioning = dmProperties.getOrElse("partitioning", "true").toBoolean
-    var partitionerFields: Seq[PartitionerField] = Seq.empty
+    properties.foreach(t => tableProperties.put(t._1, t._2))
+    val usePartitioning = properties.getOrElse("partitioning", "true").toBoolean
+
     // Inherit partition from parent table if datamap is mapped to single parent table
-    if (parentTablesList.size() == 1) {
+    val partitionerFields = if (parentTablesList.size() == 1) {
       val partitionInfo = parentTablesList.get(0).getPartitionInfo
       val parentPartitionColumns = if (!usePartitioning) {
         Seq.empty
@@ -220,26 +209,26 @@ object MVHelper {
       } else {
         Seq()
       }
-      partitionerFields = getPartitionerFields(parentPartitionColumns, fieldRelationMap)
+      getPartitionerFields(parentPartitionColumns, fieldRelationMap)
+    } else {
+      Seq.empty
     }
 
-    var order = 0
     val columnOrderMap = new java.util.HashMap[Integer, String]()
     if (partitionerFields.nonEmpty) {
-      fields.foreach { field =>
-        columnOrderMap.put(order, field.column)
-        order += 1
+      fields.zipWithIndex.foreach { case (field, index) =>
+        columnOrderMap.put(index, field.column)
       }
     }
 
-    // TODO Use a proper DB
-    val tableIdentifier = TableIdentifier(
-      dataMapSchema.getDataMapName + "_table", selectTables.head.identifier.database)
+    val mvTableIdentifier = TableIdentifier(
+      dataMapSchema.getDataMapName + "_table", mainTables.head.identifier.database)
+
     // prepare table model of the collected tokens
-    val tableModel: TableModel = CarbonParserUtil.prepareTableModel(
+    val mvTableModel: TableModel = CarbonParserUtil.prepareTableModel(
       ifNotExistPresent = ifNotExistsSet,
-      CarbonParserUtil.convertDbNameToLowerCase(tableIdentifier.database),
-      tableIdentifier.table.toLowerCase,
+      CarbonParserUtil.convertDbNameToLowerCase(mvTableIdentifier.database),
+      mvTableIdentifier.table.toLowerCase,
       fields,
       partitionerFields,
       tableProperties,
@@ -247,28 +236,28 @@ object MVHelper {
       isAlterFlow = false,
       None)
 
-    val tablePath = if (dmProperties.contains("path")) {
-      dmProperties("path")
+    val mvTablePath = if (properties.contains("path")) {
+      properties("path")
     } else {
-      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+      CarbonEnv.getTablePath(mvTableModel.databaseNameOp, mvTableModel.tableName)(sparkSession)
     }
-    CarbonCreateTableCommand(TableNewProcessor(tableModel),
-      tableModel.ifNotExistsSet, Some(tablePath), isVisible = false).run(sparkSession)
+    CarbonCreateTableCommand(TableNewProcessor(mvTableModel),
+      mvTableModel.ifNotExistsSet, Some(mvTablePath), isVisible = false).run(sparkSession)
 
-    // Map list of main table columns mapped to datamap table and add to dataMapSchema
+    // Map list of main table columns mapped to MV table and add to dataMapSchema
     val mainTableToColumnsMap = new java.util.HashMap[String, util.Set[String]]()
     val mainTableFieldIterator = fieldRelationMap.values.asJava.iterator()
     while (mainTableFieldIterator.hasNext) {
       val value = mainTableFieldIterator.next()
       value.columnTableRelationList.foreach {
         columnTableRelation =>
-          if (null == mainTableToColumnsMap.get(columnTableRelation.parentTableName)) {
+          if (null == mainTableToColumnsMap.get(columnTableRelation.tableName)) {
             val columns = new util.HashSet[String]()
-            columns.add(columnTableRelation.parentColumnName.toLowerCase())
-            mainTableToColumnsMap.put(columnTableRelation.parentTableName, columns)
+            columns.add(columnTableRelation.columnName.toLowerCase())
+            mainTableToColumnsMap.put(columnTableRelation.tableName, columns)
           } else {
-            mainTableToColumnsMap.get(columnTableRelation.parentTableName)
-              .add(columnTableRelation.parentColumnName.toLowerCase())
+            mainTableToColumnsMap.get(columnTableRelation.tableName)
+              .add(columnTableRelation.columnName.toLowerCase())
           }
       }
     }
@@ -280,27 +269,30 @@ object MVHelper {
     } else {
       dataMapSchema.setCtasQuery(updatedQueryWithDb)
     }
-    dataMapSchema
-      .setRelationIdentifier(new RelationIdentifier(tableIdentifier.database.get,
-        tableIdentifier.table,
-        CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession).getTableId))
+    dataMapSchema.setRelationIdentifier(
+      new RelationIdentifier(
+        mvTableIdentifier.database.get,
+        mvTableIdentifier.table,
+        CarbonEnv.getCarbonTable(mvTableIdentifier)(sparkSession).getTableId))
 
-    val parentIdents = selectTables.map { table =>
+    val parentIdents = mainTables.map { table =>
       val relationIdentifier = new RelationIdentifier(table.database, table.identifier.table, "")
       relationIdentifier.setTablePath(FileFactory.getUpdatedFilePath(table.location.toString))
+      relationIdentifier.setProvider(table.provider.get)
       relationIdentifier
     }
-    dataMapSchema.getRelationIdentifier.setTablePath(tablePath)
+    dataMapSchema.getRelationIdentifier.setTablePath(mvTablePath)
     dataMapSchema.setParentTables(new util.ArrayList[RelationIdentifier](parentIdents.asJava))
-    dataMapSchema.getProperties.put(DataMapProperty.FULL_REFRESH, fullRebuild.toString)
+    dataMapSchema.getProperties.put(DataMapProperty.FULL_REFRESH, isNeedFullRebuild.toString)
     try {
       DataMapStoreManager.getInstance().saveDataMapSchema(dataMapSchema)
     } catch {
       case ex: Exception =>
-        val dropTableCommand = CarbonDropTableCommand(true,
+        val dropTableCommand = CarbonDropTableCommand(
+          ifExistsSet = true,
           new Some[String](dataMapSchema.getRelationIdentifier.getDatabaseName),
           dataMapSchema.getRelationIdentifier.getTableName,
-          true)
+          dropChildTable = true)
         dropTableCommand.run(sparkSession)
         throw ex
     }
@@ -378,15 +370,18 @@ object MVHelper {
     if (value.nonEmpty) value.head else name
   }
 
-  private def getTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
+  // Return all relations involved in the plan
+  private def getCatalogTables(logicalPlan: LogicalPlan): Seq[CatalogTable] = {
     logicalPlan.collect {
       case l: LogicalRelation => l.catalogTable.get
+      case h: HiveTableRelation => h.tableMeta
     }
   }
 
-  private def getLogicalRelation(logicalPlan: LogicalPlan): Seq[LogicalRelation] = {
+  private def getRelation(logicalPlan: LogicalPlan): Seq[Relation] = {
     logicalPlan.collect {
-      case l: LogicalRelation => l
+      case l: LogicalRelation => Relation(l.output, l.catalogTable)
+      case h: HiveTableRelation => Relation(h.output, Some(h.tableMeta))
     }
   }
 
@@ -413,10 +408,20 @@ object MVHelper {
   }
 
   /**
-   * Check if we can do incremental load on the mv table. Some cases like aggregation functions
-   * which are present inside other expressions like sum(a)+sum(b) cannot be incremental loaded.
+   * Return true if we can do incremental load on the mv table based on data source
+   * @param mainTables
+   * @return
+   */
+  private def checkMainTableHasNonCarbonSource(mainTables: Seq[CatalogTable]): Boolean = {
+    mainTables.exists(table => !CarbonSource.isCarbonDataSource(table))
+  }
+
+  /**
+   * Return true if we can do incremental load on the mv table based on the query plan.
+   * Some cases like aggregation functions which are present inside other expressions
+   * like sum(a)+sum(b) cannot be incremental loaded.
    */
-  private def isFullReload(logicalPlan: LogicalPlan): Boolean = {
+  private def checkIsQueryNeedFullRebuild(logicalPlan: LogicalPlan): Boolean = {
     var isFullReload = false
     logicalPlan.transformAllExpressions {
       case a: Alias => a
@@ -534,7 +539,7 @@ object MVHelper {
           val validRelation = fieldRelations.zipWithIndex.collectFirst {
             case ((field, dataMapField), index) if
             dataMapField.columnTableRelationList.nonEmpty &&
-            head.equals(dataMapField.columnTableRelationList.head.parentColumnName) &&
+            head.equals(dataMapField.columnTableRelationList.head.columnName) &&
             dataMapField.aggregateFunction.isEmpty =>
               (PartitionerField(field.name.get,
                 field.dataType,
@@ -568,20 +573,21 @@ object MVHelper {
     generatePartitionerField(allPartitionColumn.toList, Seq.empty)
   }
 
-  private def inheritTablePropertiesFromMainTable(parentTable: CarbonTable,
+  private def inheritTablePropertiesFromMainTable(
+      parentTable: CarbonTable,
       fields: Seq[Field],
       fieldRelationMap: scala.collection.mutable.LinkedHashMap[Field, MVField],
       tableProperties: mutable.Map[String, String]): Unit = {
-    var neworder = Seq[String]()
+    var newOrder = Seq[String]()
     val parentOrder = parentTable.getSortColumns.asScala
-    parentOrder.foreach(parentcol =>
+    parentOrder.foreach(parentCol =>
       fields.filter(col => fieldRelationMap(col).aggregateFunction.isEmpty &&
                            fieldRelationMap(col).columnTableRelationList.size == 1 &&
-                           parentcol.equalsIgnoreCase(
-                             fieldRelationMap(col).columnTableRelationList(0).parentColumnName))
-        .map(cols => neworder :+= cols.column))
-    if (neworder.nonEmpty) {
-      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, neworder.mkString(","))
+                           parentCol.equalsIgnoreCase(
+                             fieldRelationMap(col).columnTableRelationList.head.columnName))
+        .map(cols => newOrder :+= cols.column))
+    if (newOrder.nonEmpty) {
+      tableProperties.put(CarbonCommonConstants.SORT_COLUMNS, newOrder.mkString(","))
     }
     val sort_scope = parentTable.getTableInfo.getFactTable.getTableProperties.asScala
       .get("sort_scope")
@@ -610,10 +616,9 @@ object MVHelper {
         val relationList = fields._2.columnTableRelationList
         // check if columns present in datamap are long_string_col in parent table. If they are
         // long_string_columns in parent, make them long_string_columns in datamap
-        if (aggFunc.isEmpty &&
-            relationList.size == 1 &&
-            longStringColumnInParents.contains(relationList.head.parentColumnName)) {
-          varcharDatamapFields += relationList.head.parentColumnName
+        if (aggFunc.isEmpty && relationList.size == 1 && longStringColumnInParents
+          .contains(relationList.head.columnName)) {
+          varcharDatamapFields += relationList.head.columnName
         }
       })
       if (varcharDatamapFields.nonEmpty) {
@@ -621,7 +626,7 @@ object MVHelper {
       }
     }
 
-    if (longStringColumn != None) {
+    if (longStringColumn.isDefined) {
       val fieldNames = fields.map(_.column)
       val newLongStringColumn = longStringColumn.get.split(",").map(_.trim).map { colName =>
         val newColName = parentTable.getTableName.toLowerCase() + "_" + colName
@@ -699,7 +704,7 @@ object MVHelper {
         case col if fieldRelationMap(col).aggregateFunction.isEmpty &&
                     fieldRelationMap(col).columnTableRelationList.size == 1 &&
                     parentcol.equalsIgnoreCase(
-                      fieldRelationMap(col).columnTableRelationList.head.parentColumnName) =>
+                      fieldRelationMap(col).columnTableRelationList.head.columnName) =>
           col.column
       })
     dataMapColumns
diff --git a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
index e707cac..c127262 100644
--- a/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
+++ b/mv/core/src/main/scala/org/apache/carbondata/mv/extension/MVUtil.scala
@@ -20,15 +20,13 @@ package org.apache.carbondata.mv.extension
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
-import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.execution.command.Field
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.types.DataType
 
 import org.apache.carbondata.common.exceptions.sql.MalformedMaterializedViewException
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.mv.plans.modular.{GroupBy, ModularPlan, ModularRelation, Select}
 import org.apache.carbondata.spark.util.CommonUtil
 
@@ -44,26 +42,25 @@ class MVUtil {
    */
   def getFieldsAndDataMapFieldsFromPlan(
       plan: ModularPlan,
-      logicalRelation: Seq[LogicalRelation]): scala.collection.mutable.LinkedHashMap[Field,
-    MVField] = {
+      relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
     plan match {
       case select: Select =>
         select.children.map {
           case groupBy: GroupBy =>
             getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
-              logicalRelation, groupBy.flagSpec)
+              relations, groupBy.flagSpec)
           case _: ModularRelation =>
             getFieldsFromProject(select.outputList, select.predicateList,
-              logicalRelation, select.flagSpec)
+              relations, select.flagSpec)
         }.head
       case groupBy: GroupBy =>
         groupBy.child match {
           case select: Select =>
             getFieldsFromProject(groupBy.outputList, select.predicateList,
-              logicalRelation, select.flagSpec)
+              relations, select.flagSpec)
           case _: ModularRelation =>
             getFieldsFromProject(groupBy.outputList, groupBy.predicateList,
-              logicalRelation, groupBy.flagSpec)
+              relations, groupBy.flagSpec)
         }
     }
   }
@@ -73,17 +70,17 @@ class MVUtil {
    * user query
    * @param outputList of the modular plan
    * @param predicateList of the modular plan
-   * @param logicalRelation list of main table from query
+   * @param relations list of main table from query
    * @param flagSpec to get SortOrder attribute if exists
    * @return fieldRelationMap
    */
   private def getFieldsFromProject(
       outputList: Seq[NamedExpression],
       predicateList: Seq[Expression],
-      logicalRelation: Seq[LogicalRelation],
+      relations: Seq[Relation],
       flagSpec: Seq[Seq[Any]]): mutable.LinkedHashMap[Field, MVField] = {
     var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
-    fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, logicalRelation)
+    fieldToDataMapFieldMap ++== getFieldsFromProject(outputList, relations)
     var finalPredicateList: Seq[NamedExpression] = Seq.empty
     predicateList.map { p =>
       p.collect {
@@ -106,31 +103,30 @@ class MVUtil {
         }
       }
     }
-    fieldToDataMapFieldMap ++== getFieldsFromProject(finalPredicateList.distinct, logicalRelation)
+    fieldToDataMapFieldMap ++== getFieldsFromProject(finalPredicateList.distinct, relations)
     fieldToDataMapFieldMap
   }
 
   private def getFieldsFromProject(
       projectList: Seq[NamedExpression],
-      logicalRelation: Seq[LogicalRelation]): mutable.LinkedHashMap[Field, MVField] = {
+      relations: Seq[Relation]): mutable.LinkedHashMap[Field, MVField] = {
     var fieldToDataMapFieldMap = scala.collection.mutable.LinkedHashMap.empty[Field, MVField]
     projectList.map {
       case attr: AttributeReference =>
-        val carbonTable = getCarbonTable(logicalRelation, attr)
-        if (null != carbonTable) {
+        val catalogTable = getCatalogTable(relations, attr)
+        if (null != catalogTable) {
           val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
-          val relation = getColumnRelation(attr.name,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-            carbonTable)
+          val relation = getColumnRelation(
+            attr.name,
+            catalogTable.identifier.table,
+            catalogTable.database)
           if (null != relation) {
             arrayBuffer += relation
           }
           var qualifier: Option[String] = None
           if (attr.qualifier.nonEmpty) {
             qualifier = if (attr.qualifier.headOption.get.startsWith("gen_sub")) {
-              Some(carbonTable.getTableName)
+              Some(catalogTable.identifier.table)
             } else {
               attr.qualifier.headOption
             }
@@ -142,17 +138,16 @@ class MVUtil {
             qualifier.headOption,
             "",
             arrayBuffer,
-            carbonTable.getTableName)
+            catalogTable.identifier.table)
         }
       case Alias(attr: AttributeReference, name) =>
-        val carbonTable = getCarbonTable(logicalRelation, attr)
-        if (null != carbonTable) {
+        val catalogTable = getCatalogTable(relations, attr)
+        if (null != catalogTable) {
           val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
-          val relation = getColumnRelation(attr.name,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-            carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-            carbonTable)
+          val relation = getColumnRelation(
+            attr.name,
+            catalogTable.identifier.table,
+            catalogTable.database)
           if (null != relation) {
             arrayBuffer += relation
           }
@@ -165,13 +160,12 @@ class MVUtil {
         val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
         a.collect {
           case attr: AttributeReference =>
-            val carbonTable = getCarbonTable(logicalRelation, attr)
-            if (null != carbonTable) {
-              val relation = getColumnRelation(attr.name,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-                carbonTable)
+            val catalogTable = getCatalogTable(relations, attr)
+            if (null != catalogTable) {
+              val relation = getColumnRelation(
+                attr.name,
+                catalogTable.identifier.table,
+                catalogTable.database)
               if (null != relation) {
                 arrayBuffer += relation
               }
@@ -190,13 +184,12 @@ class MVUtil {
         val arrayBuffer: ArrayBuffer[ColumnTableRelation] = new ArrayBuffer[ColumnTableRelation]()
         a.collect {
           case attr: AttributeReference =>
-            val carbonTable = getCarbonTable(logicalRelation, attr)
-            if (null != carbonTable) {
-              val relation = getColumnRelation(attr.name,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-                carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-                carbonTable)
+            val catalogTable = getCatalogTable(relations, attr)
+            if (null != catalogTable) {
+              val relation = getColumnRelation(
+                attr.name,
+                catalogTable.identifier.table,
+                catalogTable.database)
               if (null != relation) {
                 arrayBuffer += relation
               }
@@ -213,37 +206,26 @@ class MVUtil {
    */
   private def getColumnRelation(
       parentColumnName: String,
-      parentTableId: String,
       parentTableName: String,
-      parentDatabaseName: String,
-      carbonTable: CarbonTable): ColumnTableRelation = {
-    val parentColumn = carbonTable.getColumnByName(parentColumnName)
-    var columnTableRelation: ColumnTableRelation = null
-    if (null != parentColumn) {
-      val parentColumnId = parentColumn.getColumnId
-      columnTableRelation = ColumnTableRelation(parentColumnName = parentColumnName,
-        parentColumnId = parentColumnId,
-        parentTableName = parentTableName,
-        parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-      columnTableRelation
-    } else {
-      columnTableRelation
-    }
+      parentDatabaseName: String) = {
+    ColumnTableRelation(
+      columnName = parentColumnName,
+      tableName = parentTableName,
+      databaseName = parentDatabaseName)
   }
 
   /**
-   * This method is used to get carbon table for corresponding attribute reference
-   * from logical relation
+   * Return the catalog table after matching the attr in logicalRelation
    */
-  private def getCarbonTable(logicalRelation: Seq[LogicalRelation],
-      attr: AttributeReference) = {
-    val relations = logicalRelation
-      .filter(lr => lr.output
-        .exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
-                           attrRef.exprId.equals(attr.exprId)))
-    if (relations.nonEmpty) {
-      relations
-        .head.relation.asInstanceOf[CarbonDatasourceHadoopRelation].carbonRelation.carbonTable
+  private def getCatalogTable(
+      relations: Seq[Relation],
+      attr: AttributeReference): CatalogTable = {
+    val filteredRelation = relations.filter { relation =>
+        relation.output.exists(attrRef => attrRef.name.equalsIgnoreCase(attr.name) &&
+                                          attrRef.exprId.equals(attr.exprId))
+    }
+    if (filteredRelation.nonEmpty) {
+      filteredRelation.head.catalogTable.get
     } else {
       null
     }
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
index 633db24..55ab5e8 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVCreateTestCase.scala
@@ -104,6 +104,133 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA local inpath '$resourcesPath/data_big.csv' INTO TABLE fact_table6 OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
+  test("test create mv on parquet spark table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source using parquet as select * from fact_table1")
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    // load to parquet table and check again
+    sql("insert into source select * from fact_table1")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv on partitioned parquet spark table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("""
+        | create table source (empname String, designation String, doj Timestamp,
+        | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, salary int)
+        | using parquet partitioned by (empname)
+        """.stripMargin)
+    sql("insert into source select designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, salary, empname from fact_table1")
+    sql("select * from source limit 2").show(false)
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    // load to parquet table and check again
+    sql("insert into source select designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, salary, empname from fact_table1")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv on orc spark table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source using orc as select * from fact_table1")
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    // load to orc table and check again
+    sql("insert into source select * from fact_table1")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv on partitioned orc spark table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("""
+          | create table source (empname String, designation String, doj Timestamp,
+          | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, salary int)
+          | using orc partitioned by (empname)
+        """.stripMargin)
+    sql("insert into source select designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, salary, empname from fact_table1")
+    sql("select * from source limit 2").show(false)
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    // load to parquet table and check again
+    sql("insert into source select designation, doj, workgroupcategory, workgroupcategoryname, " +
+        "deptno, deptname, salary, empname from fact_table1")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv on parquet hive table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source stored as parquet as select * from fact_table1")
+    sql("create materialized view mv1 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    var df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    // load to parquet table and check again
+    sql("insert into source select * from fact_table1")
+    df = sql("select empname, avg(salary) from source group by empname")
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  // TODO: orc hive table is not supported since MV rewrite does not handle HiveTableRelation
+  ignore("test create mv on orc hive table") {
+    sql("drop materialized view if exists mv2")
+    sql("drop table if exists source")
+    sql("create table source stored as orc as select * from fact_table1")
+    sql("explain extended select empname, avg(salary) from source group by empname").show(false)
+    sql("create materialized view mv2 as select empname, deptname, avg(salary) from source group by empname, deptname")
+    sql("select * from mv2_table").show
+    val df = sql("select empname, avg(salary) from source group by empname")
+    sql("explain extended select empname, avg(salary) from source group by empname").show(false)
+    assert(TestUtil.verifyMVDataMap(df.queryExecution.optimizedPlan, "mv2"))
+    checkAnswer(df, sql("select empname, avg(salary) from fact_table2 group by empname"))
+    sql(s"drop materialized view mv2")
+    sql("drop table source")
+  }
+
   test("test create mv with simple and same projection") {
     sql("drop materialized view if exists mv1")
     sql("create materialized view mv1 as select empname, designation from fact_table1")
diff --git a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 84c8037..a8c9af5 100644
--- a/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -235,12 +235,11 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
 
   test("test mv with non-carbon table") {
     sql("drop table if exists noncarbon")
-    sql("create table noncarbon (product string,amount int)")
+    sql("create table noncarbon (product string,amount int) stored as parquet")
     sql("insert into noncarbon values('Mobile',2000)")
     sql("drop materialized view if exists p")
-    intercept[MalformedCarbonCommandException] {
-      sql("Create materialized view p  as Select product from noncarbon")
-    }.getMessage.contains("Non-Carbon table does not support creating MV materialized view")
+    sql("Create materialized view p as Select product from noncarbon")
+    sql("drop materialized view p")
     sql("drop table if exists noncarbon")
   }