You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/04/28 14:29:44 UTC

[carbondata] branch master updated: [CARBONDATA-4330] Incremental Dataload of Average aggregate in MV

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 45acd67ed7 [CARBONDATA-4330] Incremental Dataload of Average aggregate in MV
45acd67ed7 is described below

commit 45acd67ed742d89d539ec0351f77f08c7762e7de
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Mon Jan 24 15:17:48 2022 +0530

    [CARBONDATA-4330] Incremental Dataload of Average aggregate in MV
    
    Why is this PR needed?
    Currently, whenever MV is created with average aggregate, a full
    refresh is done meaning it reloads the whole MV for any newly
    added segments. This will slow down the loading. With incremental
    data load, only the segments that are newly added can be loaded to the MV.
    
    What changes were proposed in this PR?
    If avg is present, rewrite the query with the sum and count of the
    columns to create MV and use them to derive avg.
    Refer: https://docs.google.com/document/d/1kPEMCX50FLZcmyzm6kcIQtUH9KXWDIqh-Hco7NkTp80/edit
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4257
---
 .../org/apache/carbondata/core/view/MVSchema.java  |  14 ++
 .../TestCreateIndexForCleanAndDeleteSegment.scala  |   2 +-
 .../apache/carbondata/view/MVCatalogInSpark.scala  |  23 +++-
 .../org/apache/carbondata/view/MVRefresher.scala   |   8 +-
 .../apache/carbondata/view/MVSchemaWrapper.scala   |   1 +
 .../command/view/CarbonCreateMVCommand.scala       |  53 +++++---
 .../org/apache/spark/sql/optimizer/MVRewrite.scala | 121 +++++++++++++++--
 .../apache/spark/sql/parser/MVQueryParser.scala    |  34 +++++
 .../scala/org/apache/carbondata/view/MVTest.scala  | 145 +++++++++++++++++++++
 .../carbondata/view/rewrite/MVCreateTestCase.scala |  40 +++++-
 .../mv/plans/modular/AggregatePushDown.scala       |  34 +++--
 11 files changed, 430 insertions(+), 45 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVSchema.java b/core/src/main/java/org/apache/carbondata/core/view/MVSchema.java
index 3887c97fe0..b7564ac5e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVSchema.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVSchema.java
@@ -56,6 +56,12 @@ public class MVSchema implements Serializable, Writable {
    */
   private String query;
 
+  /**
+   * SQL modified query string.
+   * In case of MV with avg incremental mode, the original query is modified.
+   */
+  private String modifiedQuery;
+
   /**
    * Properties provided by user
    */
@@ -99,10 +105,18 @@ public class MVSchema implements Serializable, Writable {
     return query;
   }
 
+  public String getModifiedQuery() {
+    return modifiedQuery;
+  }
+
   public void setQuery(String query) {
     this.query = query;
   }
 
+  public void setModifiedQuery(String modifiedQuery) {
+    this.modifiedQuery = modifiedQuery;
+  }
+
   public Map<String, String> getProperties() {
     return properties;
   }
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
index ded9c87eeb..5623f90e1a 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestCreateIndexForCleanAndDeleteSegment.scala
@@ -73,7 +73,7 @@ class TestCreateIndexForCleanAndDeleteSegment extends QueryTest with BeforeAndAf
     assert(preDeleteSegmentsByDate == postDeleteSegmentsByDate)
     val result = sql("show materialized views on table delete_segment_by_id").collectAsList()
     assert(result.get(0).get(2).toString.equalsIgnoreCase("ENABLED"))
-    assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
     assert(result.get(0).get(4).toString.equalsIgnoreCase("on_commit"))
     dryRun = sql("clean files for table delete_segment_by_id" +
       " OPTIONS('dryrun'='true', 'force'='true')").collect()
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
index 30f251ffc9..408a66041f 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala
@@ -111,6 +111,26 @@ case class MVCatalogInSpark(session: SparkSession)
         // So setting back to current database.
         session.catalog.setCurrentDatabase(currentDatabase)
       }
+      // The MV query is modified by replacing avg with sum and count columns.
+      // Here, create modifiedLogicalPlan from modified query, so that even though MV is not created
+      // with sum or count columns, we could still derive the columns.
+      // For example, Consider MV creation statement:
+      // create materialized view mv1 as select empname, avg(salary) from source group by empname;
+      // and here if user queries:
+      // Select empname, sum(salary) from source group by empname;
+      // we can use modifiedLogicalPlan for query matching and rewrite steps.
+      val modifiedLogicalPlan = if (mvSchema.getModifiedQuery != null &&
+                                    !mvSchema.getModifiedQuery
+                                      .equalsIgnoreCase(mvSchema.getQuery)) {
+        try {
+          session.catalog.setCurrentDatabase(mvSchema.getIdentifier.getDatabaseName)
+          MVHelper.dropDummyFunction(MVQueryParser.getQueryPlan(mvSchema.getModifiedQuery, session))
+        } finally {
+          session.catalog.setCurrentDatabase(currentDatabase)
+        }
+      } else {
+        logicalPlan
+      }
       val mvSignature = SimpleModularizer.modularize(
         BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized.signature
       val mvIdentifier = mvSchema.getIdentifier
@@ -143,6 +163,7 @@ case class MVCatalogInSpark(session: SparkSession)
         mvSignature,
         mvSchema,
         logicalPlan,
+        modifiedLogicalPlan,
         MVPlanWrapper(modularPlan, mvSchema))
     }
   }
@@ -172,7 +193,7 @@ case class MVCatalogInSpark(session: SparkSession)
         val modularPlan = SimpleModularizer.modularize(
             BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized
         val signature = modularPlan.signature
-        viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, null)
+        viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, logicalPlan, null)
       }
     }
   }
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index 545ccfccf9..8f1237dbb9 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -166,11 +166,15 @@ object MVRefresher {
                                newLoadName: String,
                                segmentMap: java.util.Map[String, java.util.List[String]],
                                session: SparkSession): Boolean = {
-    val query = viewSchema.getQuery
+    val isFullRefresh = !viewSchema.isRefreshIncremental
+    var query = viewSchema.getQuery
+    if (!isFullRefresh) {
+      // query is modified internally if average aggregate is used with incremental load.
+      query = viewSchema.getModifiedQuery
+    }
     if (query != null) {
       val viewIdentifier = viewSchema.getIdentifier
       val updatedQuery = MVQueryParser.getQuery(query, session)
-      val isFullRefresh = !viewSchema.isRefreshIncremental
       // Set specified segments for incremental load
       val segmentMapIterator = segmentMap.entrySet().iterator()
       while (segmentMapIterator.hasNext) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVSchemaWrapper.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVSchemaWrapper.scala
index dea31f5bcc..0c8f981ed6 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVSchemaWrapper.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVSchemaWrapper.scala
@@ -27,4 +27,5 @@ case class MVSchemaWrapper(
     viewSignature: Option[Signature],
     viewSchema: MVSchema,
     logicalPlan: LogicalPlan,
+    modifiedLogicalPlan: LogicalPlan,
     modularPlan: ModularPlan)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
index 8234c27c2b..e2d367db5a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala
@@ -139,13 +139,42 @@ case class CarbonCreateMVCommand(
 
   override protected def opName: String = "CREATE MATERIALIZED VIEW"
 
+  def checkIfAvgAggregatePresent(logicalPlan: LogicalPlan): Boolean = {
+    var isAvgPresent = false
+    logicalPlan.transformAllExpressions {
+      case aggregate: AggregateExpression =>
+        val avgExist = aggregate.aggregateFunction match {
+          case _: Average => true
+          case _ => false
+        }
+        isAvgPresent = avgExist || isAvgPresent
+        aggregate
+    }
+    isAvgPresent
+  }
+
   private def doCreate(session: SparkSession,
       tableIdentifier: TableIdentifier,
       viewManager: MVManagerInSpark,
       viewCatalog: MVCatalogInSpark): MVSchema = {
-    val logicalPlan = MVHelper.dropDummyFunction(
+    var logicalPlan = MVHelper.dropDummyFunction(
       MVQueryParser.getQueryPlan(queryString, session))
-      // check if mv with same query already exists
+    val relatedTables = getRelatedTables(logicalPlan)
+    val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
+                              checkIsHasNonCarbonTable(relatedTables)) {
+      MVProperty.REFRESH_MODE_FULL
+    } else {
+      MVProperty.REFRESH_MODE_INCREMENTAL
+    }
+    var modifiedQueryString = queryString
+    if (viewRefreshMode.equalsIgnoreCase(MVProperty.REFRESH_MODE_INCREMENTAL) &&
+      checkIfAvgAggregatePresent(logicalPlan)) {
+      // Check if average aggregate is used and derive logical plan from modified query string.
+      modifiedQueryString = MVQueryParser.checkForAvgAndModifySql(queryString)
+      logicalPlan = MVHelper.dropDummyFunction(
+        MVQueryParser.getQueryPlan(modifiedQueryString, session))
+    }
+    // check if mv with same query already exists
     val mvSchemaWrapper = viewCatalog.getMVWithSameQueryPresent(logicalPlan)
     if (mvSchemaWrapper.nonEmpty) {
       val mvWithSameQuery = mvSchemaWrapper.get.viewSchema.getIdentifier.getTableName
@@ -154,7 +183,6 @@ case class CarbonCreateMVCommand(
     }
     val modularPlan = checkQuery(logicalPlan)
     val viewSchema = getOutputSchema(logicalPlan)
-    val relatedTables = getRelatedTables(logicalPlan)
     val relatedTableList = toCarbonTables(session, relatedTables)
     val inputCols = logicalPlan.output.map(x =>
       x.name
@@ -193,13 +221,6 @@ case class CarbonCreateMVCommand(
         }
         relatedTableNames.add(table.getTableName)
     }
-
-    val viewRefreshMode = if (checkIsQueryNeedFullRefresh(logicalPlan) ||
-      checkIsHasNonCarbonTable(relatedTables)) {
-      MVProperty.REFRESH_MODE_FULL
-    } else {
-      MVProperty.REFRESH_MODE_INCREMENTAL
-    }
     val viewRefreshTriggerMode = if (deferredRefresh) {
       MVProperty.REFRESH_TRIGGER_MODE_ON_MANUAL
     } else {
@@ -325,6 +346,9 @@ case class CarbonCreateMVCommand(
       schema.setTimeSeries(true)
     }
     schema.setQuery(queryString)
+    if (!viewRefreshMode.equals(MVProperty.REFRESH_MODE_FULL)) {
+      schema.setModifiedQuery(modifiedQueryString)
+    }
     try {
       viewManager.createSchema(schema.getIdentifier.getDatabaseName, schema)
     } catch {
@@ -564,14 +588,7 @@ case class CarbonCreateMVCommand(
     var needFullRefresh = false
     logicalPlan.transformAllExpressions {
       case alias: Alias => alias
-      case aggregate: AggregateExpression =>
-        // If average function present then go for full refresh
-        val reload = aggregate.aggregateFunction match {
-          case _: Average => true
-          case _ => false
-        }
-        needFullRefresh = reload || needFullRefresh
-        aggregate
+      case aggregate: AggregateExpression => aggregate
       case cast: Cast =>
         needFullRefresh = cast.child.find {
           case _: AggregateExpression => false
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
index 2ea1be5b3c..48fb93f45c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala
@@ -28,20 +28,21 @@ import scala.util.control.Breaks.{break, breakable}
 import org.apache.log4j.Logger
 import org.apache.spark.sql.{CarbonToSparkAdapter, SparkSession}
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, AttributeReference, Cast, Divide, Expression, Literal, NamedExpression, ScalaUDF, SortOrder}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical.{Filter, Join, LogicalPlan}
 import org.apache.spark.sql.catalyst.trees.{CurrentOrigin, TreeNode}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.types.{DataType, DataTypes}
+import org.apache.spark.sql.types.{DataTypes, DoubleType}
 import org.apache.spark.unsafe.types.UTF8String
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.preagg.TimeSeriesFunctionEnum
 import org.apache.carbondata.mv.expressions.modular.{ModularSubquery, ScalarModularSubquery}
 import org.apache.carbondata.mv.plans.modular.{ExpressionHelper, GroupBy, HarmonizedRelation, LeafNode, Matchable, ModularPlan, ModularRelation, Select, SimpleModularizer}
 import org.apache.carbondata.mv.plans.util.BirdcageOptimizer
-import org.apache.carbondata.view.{MVCatalogInSpark, MVPlanWrapper, MVTimeGranularity, TimeSeriesFunction}
+import org.apache.carbondata.view.{MVCatalogInSpark, MVHelper, MVPlanWrapper, MVSchemaWrapper, MVTimeGranularity, TimeSeriesFunction}
 
 /**
  * The primary workflow for rewriting relational queries using Spark libraries.
@@ -408,8 +409,8 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
           LOGGER.info("Query matching has been initiated with available mv schema's")
           val rewrittenPlans =
             for {schemaWrapper <- catalog.lookupFeasibleSchemas(plan).toStream
-                 subsumer <- SimpleModularizer.modularize(
-                   BirdcageOptimizer.execute(schemaWrapper.logicalPlan)).map(_.semiHarmonized)
+                 subsumer <- SimpleModularizer.modularize(BirdcageOptimizer.execute(
+                   getLogicalPlan(schemaWrapper, plan))).map(_.semiHarmonized)
                  subsumee <- unifySubsumee(plan)
                  rewrittenPlan <- rewriteWithSchemaWrapper0(
                    unifySubsumer2(
@@ -434,6 +435,27 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     }
   }
 
+  // To get the logical plan based on user query.
+  // 1. modifiedLogicalPlan: It is created from modified MV query.
+  // If user query has sum or count but not avg, modifiedLogicalPlan can be used to derive results.
+  // Suppose, if user query has both sum and avg of column, then we cannot use modifiedLogicalPlan,
+  // as it doesn't have avg attribute.
+  // 2. logicalPlan: It is created from original MV query.
+  // Use this if avg aggregate is present in user query and in all other cases.
+  private def getLogicalPlan(schemaWrapper: MVSchemaWrapper,
+      plan: ModularPlan): LogicalPlan = {
+    val outputCols = plan.output
+    val sumOrCountCol = outputCols.find(x => x.sql.contains(CarbonCommonConstants.SUM + "(") ||
+                                              x.sql.contains(CarbonCommonConstants.COUNT + "("))
+    if (!schemaWrapper.logicalPlan.equals(schemaWrapper.modifiedLogicalPlan) &&
+        !outputCols.exists(x => x.sql.contains(CarbonCommonConstants.AVERAGE + "("))
+        && sumOrCountCol.isDefined) {
+      schemaWrapper.modifiedLogicalPlan
+    } else {
+      schemaWrapper.logicalPlan
+    }
+  }
+
   private def rewriteWithSchemaWrapper0(
       subsumer: ModularPlan,
       subsumee: ModularPlan): Option[ModularPlan] = {
@@ -734,10 +756,10 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
         case Seq(groupBy: GroupBy) if groupBy.modularPlan.isDefined =>
           val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
           val plan = planWrapper.modularPlan.asInstanceOf[Select]
-          val aliasMap = getAliasMap(plan.outputList, groupBy.outputList)
-          // Update the flagSpec as per the mv table attributes.
-          val updatedFlagSpec = updateFlagSpec(select, plan, aliasMap, keepAlias = false)
           if (!planWrapper.viewSchema.isRefreshIncremental) {
+            val aliasMap = getAliasMap(plan.outputList, groupBy.outputList)
+            // Update the flagSpec as per the mv table attributes.
+            val updatedFlagSpec = updateFlagSpec(select, plan, aliasMap, keepAlias = false)
             val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
             val outputList =
               for ((output1, output2) <- groupBy.outputList zip updatedPlanOutputList) yield {
@@ -786,6 +808,9 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
                       other
                   }
               }
+            val aliasMap = getAliasMap(child.outputList, groupBy.outputList)
+            // Update the flagSpec as per the mv table attributes.
+            val updatedFlagSpec = updateFlagSpec(select, plan, aliasMap, keepAlias = false)
             // TODO Remove the unnecessary columns from selection.
             // Only keep columns which are required by parent.
             select.copy(
@@ -800,7 +825,67 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
         val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
         val plan = planWrapper.modularPlan.asInstanceOf[Select]
         val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
-        val outputListMapping = groupBy.outputList zip updatedPlanOutputList
+        // columnIndex is used to iterate over updatedPlanOutputList. For each avg attribute,
+        // updatedPlanOutputList has 2 attributes (sum and count) and
+        // by maintaining index we can increment and access when needed.
+        var columnIndex = -1
+
+        def getColumnName(expression: Expression): String = {
+          expression match {
+            case attributeReference: AttributeReference => attributeReference.name
+            case literal: Literal => literal.value.toString
+            case _ => ""
+          }
+        }
+        // get column from list having the given aggregate and column name.
+        def getColumnFromOutputList(updatedPlanOutputList: Seq[NamedExpression], aggregate: String,
+            colName: String): NamedExpression = {
+          val nextIndex = columnIndex + 1
+          if ((nextIndex) < updatedPlanOutputList.size &&
+              updatedPlanOutputList(nextIndex).name.contains(aggregate) &&
+              updatedPlanOutputList(nextIndex).name.contains(colName)) {
+            columnIndex += 1
+            updatedPlanOutputList(columnIndex)
+          } else {
+            updatedPlanOutputList.find(x => x.name.contains(aggregate) &&
+                                            x.name.contains(colName)).get
+          }
+        }
+
+        val outputListMapping = if (groupBy.outputList
+          .exists(_.sql.contains(CarbonCommonConstants.AVERAGE + "("))) {
+          // for each avg attribute, updatedPlanOutputList has 2 attributes (sum and count),
+          // so direct mapping of groupBy.outputList and updatedPlanOutputList is not possible.
+          // If query has avg, then get the sum, count attributes in the list and map accordingly.
+          for (exp <- groupBy.outputList) yield {
+            exp match {
+              case Alias(aggregateExpression: AggregateExpression, _)
+                if aggregateExpression.aggregateFunction.isInstanceOf[Average] =>
+                val colName = getColumnName(aggregateExpression.collectLeaves().head)
+                val sumAttr = getColumnFromOutputList(updatedPlanOutputList,
+                  CarbonCommonConstants.SUM, colName)
+                val countAttr = getColumnFromOutputList(updatedPlanOutputList,
+                  CarbonCommonConstants.COUNT, colName)
+                (exp, sumAttr, Some(countAttr))
+              case Alias(aggregateExpression: AggregateExpression, _)
+                if aggregateExpression.aggregateFunction.isInstanceOf[Sum] ||
+                   aggregateExpression.aggregateFunction.isInstanceOf[Count] =>
+                // If query contains avg aggregate and also sum or count of column,
+                // duplicate column creation is avoided. The column might have already mapped
+                // with avg, so search from output list to find the column and map.
+                val colName = getColumnName(aggregateExpression.collectLeaves().head)
+                val colAttr = getColumnFromOutputList(updatedPlanOutputList,
+                  aggregateExpression.aggregateFunction.prettyName, colName)
+                (exp, colAttr, None)
+              case _ =>
+                columnIndex += 1
+                (exp, updatedPlanOutputList(columnIndex), None)
+            }
+          }
+        } else {
+          (groupBy.outputList, updatedPlanOutputList, List.fill(updatedPlanOutputList.size)(None))
+            .zipped.toList
+        }
         val (outputList: Seq[NamedExpression], updatedPredicates: Seq[Expression]) =
           getUpdatedOutputAndPredicateList(
           groupBy,
@@ -816,7 +901,8 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
           val planWrapper = select.modularPlan.get.asInstanceOf[MVPlanWrapper]
           val plan = planWrapper.modularPlan.asInstanceOf[Select]
           val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, select.modularPlan)
-          val outputListMapping = groupBy.outputList zip updatedPlanOutputList
+          val outputListMapping = (groupBy.outputList, updatedPlanOutputList, List.fill(
+            updatedPlanOutputList.size)(None)).zipped.toList
           val (outputList: Seq[NamedExpression], updatedPredicates: Seq[Expression]) =
             getUpdatedOutputAndPredicateList(
               groupBy,
@@ -834,9 +920,9 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
   }
 
   private def getUpdatedOutputAndPredicateList(groupBy: GroupBy,
-      outputListMapping: Seq[(NamedExpression, NamedExpression)]):
+      outputListMapping: Seq[(NamedExpression, NamedExpression, Option[NamedExpression])]):
   (Seq[NamedExpression], Seq[Expression]) = {
-    val outputList = for ((output1, output2) <- outputListMapping) yield {
+    val outputList = for ((output1, output2, output3) <- outputListMapping) yield {
       output1 match {
         case Alias(aggregateExpression: AggregateExpression, _)
           if aggregateExpression.aggregateFunction.isInstanceOf[Sum] =>
@@ -844,6 +930,13 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
           val uFun = aggregate.copy(child = output2)
           Alias(aggregateExpression.copy(aggregateFunction = uFun),
             output1.name)(exprId = output1.exprId)
+        case Alias(aggregateExpression: AggregateExpression, _)
+          if aggregateExpression.aggregateFunction.isInstanceOf[Average] =>
+          val uFunSum = Sum(output2)
+          val uFunCount = Sum(output3.get)
+          val uFunDivide = Divide(Cast(uFunSum, DoubleType), Cast(uFunCount, DoubleType))
+          Alias(Cast(uFunDivide, DoubleType), output1.name)(exprId = output1.exprId)
+
         case Alias(aggregateExpression: AggregateExpression, _)
           if aggregateExpression.aggregateFunction.isInstanceOf[Max] =>
           val max = aggregateExpression.aggregateFunction.asInstanceOf[Max]
@@ -881,7 +974,7 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     val updatedPredicates = groupBy.predicateList.map {
       predicate =>
         outputListMapping.find {
-          case (output1, _) =>
+          case (output1, _, _) =>
             output1 match {
               case alias: Alias if predicate.isInstanceOf[Alias] =>
                 alias.child.semanticEquals(predicate.children.head)
@@ -891,7 +984,7 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
                 other.semanticEquals(predicate)
             }
         } match {
-          case Some((_, output2)) => output2
+          case Some((_, output2, _)) => output2
           case _ => predicate
         }
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/MVQueryParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/MVQueryParser.scala
index 43f319033e..ecf65b0aba 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/MVQueryParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/MVQueryParser.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.optimizer.MVRewriteRule
 import org.apache.spark.sql.util.SparkSQLUtil
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.view.MVFunctions
 
 class MVQueryParser extends StandardTokenParsers with PackratParsers {
@@ -103,6 +104,39 @@ object MVQueryParser {
       .drop(MVFunctions.DUMMY_FUNCTION)
   }
 
+  // In the mv query string, check if avg of column is present and
+  // replace it with sum and count of same column.
+  // Check for duplicates and replace column if its not present already.
+  def checkForAvgAndModifySql(sql: String): String = {
+    val (avg, sum, count) = (CarbonCommonConstants.AVERAGE,
+      CarbonCommonConstants.SUM, CarbonCommonConstants.COUNT)
+    var modifiedSql: String = sql.toLowerCase
+    val pattern = """select(.*)from""".r
+    for (column <- pattern.findFirstMatchIn(modifiedSql).get.group(1).split(",")) {
+      if (column.contains(avg + "(")) {
+        val sumStr = column.replace(avg, sum)
+        val countStr = column.replace(avg, count)
+        val queryContainsSum = modifiedSql.contains(sumStr.trim)
+        val queryContainsCount = modifiedSql.contains(countStr.trim)
+        if (queryContainsSum && !queryContainsCount) {
+          modifiedSql = modifiedSql.replace(column, s"$countStr")
+        } else if (!queryContainsSum && queryContainsCount) {
+          modifiedSql = modifiedSql.replace(column, s"$sumStr")
+        } else if (!queryContainsSum && !queryContainsCount) {
+          modifiedSql = modifiedSql.replace(column, s"$sumStr,$countStr")
+        } else {
+          modifiedSql = ("\\s*" + Regex.quote(column.trim) + "\\s*,").r
+            .replaceAllIn(modifiedSql, "")
+          modifiedSql = (",\\s*" + Regex.quote(column.trim) + "\\s*,").r
+            .replaceAllIn(modifiedSql, ",")
+          modifiedSql = (",\\s*" + Regex.quote(column.trim) + "\\s*").r
+            .replaceAllIn(modifiedSql, "")
+        }
+      }
+    }
+    modifiedSql.trim
+  }
+
   def getQueryPlan(query: String, session: SparkSession): LogicalPlan = {
     val updatedQuery = new MVQueryParser().parseAndAppendDummyFunction(query)
     val analyzedPlan = session.sql(updatedQuery).queryExecution.analyzed
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
index 38c7b3fff5..c2ef20bf06 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/MVTest.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.view
 
 import java.io.File
 
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog.HiveTableRelation
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -147,6 +148,150 @@ class MVTest extends QueryTest with BeforeAndAfterAll {
     sql("drop table source")
   }
 
+  test("test create mv on carbon table with avg aggregate") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source(empname string, salary long) stored as carbondata")
+    sql("insert into source select 'sd',20")
+    sql("insert into source select 'sd',200")
+    sql("create materialized view mv1 as select empname, avg(salary) from source group by empname")
+    sql("insert into source select 'sd',30")
+    val result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    val df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, Seq(Row("sd", 83.33334)))
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv with avg on carbon partition table") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source(a string, empname string) stored as carbondata partitioned by(salary long)")
+    sql("insert into source select 'sd','sd',20")
+    sql("insert into source select 'sdf','sd',200")
+    sql("create materialized view mv1 as select empname, avg(salary) from source group by empname")
+    sql("insert into source select 'dsf','sd',30")
+    val result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    val df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, Seq(Row("sd", 83.33334)))
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create mv with avg and compaction") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source(a string, empname string, salary long) stored as carbondata")
+    sql("insert into source select 'sd','sd',20")
+    sql("insert into source select 'sdf','sd',200")
+    sql("create materialized view mv1 as select empname, avg(salary) from source group by empname")
+    sql("insert into source select 'dsf','sd',30")
+    sql("insert into source select 'dsf','sd',10")
+    sql("alter table source compact 'minor'")
+    val result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    val df = sql("select empname, avg(salary) from source group by empname")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, Seq(Row("sd", 65)))
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test create MV with average of floor") {
+    sql("drop table if exists source")
+    sql(
+      "create table if not exists source (tags_id STRING, value DOUBLE) stored as carbondata")
+    sql("insert into source values ('xyz-e01',3.34)")
+    sql("insert into source values ('xyz-e01',1.25)")
+    val mvQuery = "select tags_id, avg(floor(value)) from source group by tags_id"
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    sql("insert into source values ('xyz-e01',3.54)")
+    val df = sql("select tags_id, avg(floor(value)) as sum_val from source group by tags_id")
+    val result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(df, Seq(Row("xyz-e01", 2.33334)))
+    sql("drop materialized view if exists dm1")
+    sql("drop table if exists source")
+  }
+
+  test("test create mv on carbon table with avg inside other function") {
+    sql("drop materialized view if exists mv1")
+    sql("drop table if exists source")
+    sql("create table source(a string, empname string, salary long) stored as carbondata")
+    sql("insert into source select 'sd','sd',20")
+    sql("insert into source select 'sdf','sd',200")
+    sql("create materialized view mv1 as select empname,round(avg(salary),0) from source group by empname")
+    sql("insert into source select 'dsf','sd',30")
+    val df = sql("select empname, round(avg(salary),0) from source group by empname")
+    val result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("full"))
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "mv1"))
+    checkAnswer(df, Seq(Row("sd", 83)))
+    sql(s"drop materialized view mv1")
+    sql("drop table source")
+  }
+
+  test("test average MV with sum and count columns") {
+    sql("drop table if exists source")
+    sql("create table if not exists source (tags_id STRING, value DOUBLE) stored as carbondata")
+    sql("insert into source values ('xyz-e01',3)")
+    sql("insert into source values ('xyz-e01',1)")
+    var mvQuery = "select tags_id ,sum(value), avg(value) from source group by tags_id"
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    var result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    sql("insert into source values ('xyz-e01',3)")
+    var df = sql(mvQuery)
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(df, Seq(Row("xyz-e01", 7, 2.33334)))
+
+    mvQuery = "select tags_id ,sum(value), avg(value),count(value) from source group by tags_id"
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    sql("insert into source values ('xyz-e01',3)")
+    df = sql(mvQuery)
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(df, Seq(Row("xyz-e01", 10, 2.5, 4)))
+
+    mvQuery = "select tags_id , avg(value),count(value),max(value) from source group by tags_id"
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    result = sql("show materialized views on table source").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    sql("insert into source values ('xyz-e01',3)")
+    df = sql(mvQuery)
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(df, Seq(Row("xyz-e01", 2.6, 5, 3)))
+    sql("drop materialized view if exists dm1")
+    sql("drop table if exists source")
+  }
+
+  test("test create MV with average and query with sum column") {
+    sql("drop table if exists source")
+    sql(
+      "create table if not exists source (tags_id STRING, value DOUBLE) stored as carbondata")
+    sql("insert into source values ('xyz-e01',3)")
+    sql("insert into source values ('xyz-e01',1)")
+    val mvQuery = "select tags_id, avg(value) from source group by tags_id"
+    sql("drop materialized view if exists dm1")
+    sql(s"create materialized view dm1  as $mvQuery")
+    sql("insert into source values ('xyz-e01',3)")
+    val df = sql("select tags_id, sum(value) from source group by tags_id")
+    assert(isTableAppearedInPlan(df.queryExecution.optimizedPlan, "dm1"))
+    checkAnswer(df, Seq(Row("xyz-e01", 7)))
+    sql("drop materialized view if exists dm1")
+    sql("drop table if exists source")
+  }
+
   test("test create mv fail because of name used") {
     sql("drop table if exists mv1")
     sql("drop materialized view if exists mv1")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
index a4aa1c5e54..71fcb809a5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVCreateTestCase.scala
@@ -710,7 +710,7 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop materialized view mv36")
   }
 
-  test("test create materialized view with agg push join with sub group by ") {
+  test("test create materialized view with sum agg push join with sub group by ") {
     sql("drop materialized view if exists mv37")
     sql("create materialized view mv37 as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
     val frame = sql(
@@ -722,6 +722,40 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"drop materialized view mv37")
   }
 
+  test("test create materialized view with avg agg push join with sub group by") {
+    sql("drop materialized view if exists mv37")
+    sql(
+      "create materialized view mv37 as select empname, designation,avg(utilization), avg(1) from" +
+      " fact_table1 group by empname, designation")
+    val frame = sql(
+      "select t1.empname, avg(t1.utilization), avg(1) from fact_table1 t1,fact_table2 t2  " +
+      "where t1.empname = t2.empname group by t1.empname")
+    val result = sql("show materialized views on table fact_table1").collectAsList()
+    assert(result.get(0).get(3).toString.equalsIgnoreCase("incremental"))
+    assert(TestUtil.verifyMVHit(frame.queryExecution.optimizedPlan, "mv37"))
+    checkAnswer(frame,
+      sql("select t1.empname, avg(t1.utilization), avg(1) from fact_table3 t1,fact_table4 t2  " +
+          "where t1.empname = t2.empname group by t1.empname, t1.designation"))
+    sql(s"drop materialized view mv37")
+  }
+
+  test("test create materialized view with avg agg push join with sub group on hive") {
+    sql("drop materialized view if exists mv37")
+    sql("create table source1 as select * from fact_table1")
+    sql("create table source2 as select * from fact_table2")
+    sql("create table source3 as select * from fact_table3")
+    sql("create table source4 as select * from fact_table4")
+    sql("create materialized view mv37 as select empname, designation, avg(utilization) " +
+        "from source1 group by empname, designation")
+    val frame = sql(
+      "select t1.empname, avg(t1.utilization) from source1 t1,source2 t2  " +
+      "where t1.empname = t2.empname group by t1.empname")
+    assert(TestUtil.verifyMVHit(frame.queryExecution.optimizedPlan, "mv37"))
+    checkAnswer(frame, sql("select t1.empname, avg(t1.utilization) from source3 t1,source4 t2  " +
+                           "where t1.empname = t2.empname group by t1.empname, t1.designation"))
+    sql(s"drop materialized view mv37")
+  }
+
   test("test create materialized view with agg push join with group by ") {
     sql("drop materialized view if exists mv38")
     sql("create materialized view mv38 as select empname, designation, sum(utilization) from fact_table1 group by empname, designation")
@@ -1378,6 +1412,10 @@ class MVCreateTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   def drop(): Unit = {
+    sql("drop table IF EXISTS source1")
+    sql("drop table IF EXISTS source2")
+    sql("drop table IF EXISTS source3")
+    sql("drop table IF EXISTS source4")
     sql("drop table IF EXISTS fact_table1")
     sql("drop table IF EXISTS fact_table2")
     sql("drop table IF EXISTS fact_table3")
diff --git a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
index 5934bc582e..cbcd424846 100644
--- a/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
+++ b/mv/plan/src/main/scala/org/apache/carbondata/mv/plans/modular/AggregatePushDown.scala
@@ -19,7 +19,7 @@ package org.apache.carbondata.mv.plans.modular
 
 import scala.collection._
 
-import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Divide, Expression, ExprId, Literal, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeMap, Cast, Expression, ExprId, Literal, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.types.DataType
 
@@ -68,6 +68,18 @@ trait AggregatePushDown { // self: ModularPlan =>
     }
   }
 
+  // creates alias for Average aggregate.
+  def getAliasMapForAvgAggregate(exp: Expression,
+      avg: AggregateExpression,
+      aliasInfo: Option[(String, ExprId)]): (NamedExpression, Seq[NamedExpression]) = {
+    val avg1 = AggregateExpression(Average(exp), avg.mode, isDistinct = false)
+    val alias = Alias(avg1, avg1.toString)()
+    val tAvg = avg.copy(Max(alias.toAttribute), avg.mode,
+      isDistinct = false, resultId = avg.resultId)
+    val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
+    (Alias(tAvg, name)(exprId = id), Seq(alias))
+  }
+
   private def transformAggregate(aggregate: AggregateExpression,
       selAliasMap: AttributeMap[Attribute],
       ith: Int,
@@ -174,16 +186,22 @@ trait AggregatePushDown { // self: ModularPlan =>
         val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
           .asInstanceOf[Attribute]
         if (fact.outputSet.contains(tAttr)) {
-          val savg = AggregateExpression(Sum(tAttr), avg.mode, isDistinct = false)
-          val cavg = AggregateExpression(Count(tAttr), avg.mode, isDistinct = false)
-          val sAvg = Alias(savg, savg.toString)()
-          val cAvg = Alias(cavg, cavg.toString)()
-          val tAvg = Divide(sAvg.toAttribute, cAvg.toAttribute)
-          val (name, id) = aliasInfo.getOrElse(("", NamedExpression.newExprId))
-          map += (ith -> (Alias(tAvg, name)(exprId = id), Seq(sAvg, cAvg)))
+          map += (ith ->  getAliasMapForAvgAggregate(tAttr, avg, aliasInfo))
+        } else {
+          Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
+        }
+      case avg@MatchAggregateExpression(Average(cast@MatchCast(expr, dataType)), _, false, _, _) =>
+        val tAttr = selAliasMap.get(expr.asInstanceOf[Attribute]).getOrElse(expr)
+          .asInstanceOf[Attribute]
+        if (fact.outputSet.contains(tAttr)) {
+          map += (ith -> getAliasMapForAvgAggregate(cast, avg, aliasInfo))
         } else {
           Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
         }
+      case avg: AggregateExpression if avg.aggregateFunction.isInstanceOf[Average] &&
+                                       avg.aggregateFunction.children.head.isInstanceOf[Literal] =>
+        map +=
+        (ith -> getAliasMapForAvgAggregate(avg.aggregateFunction.children.head, avg, aliasInfo))
       case _ => Map.empty[Int, (NamedExpression, Seq[NamedExpression])]
     }
   }