You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@carbondata.apache.org by GitBox <gi...@apache.org> on 2022/04/08 08:01:30 UTC

[GitHub] [carbondata] akashrn5 commented on a diff in pull request #4257: [CARBONDATA-4330] Incremental‌ ‌Dataload‌ ‌of Average aggregate in ‌MV‌‌

akashrn5 commented on code in PR #4257:
URL: https://github.com/apache/carbondata/pull/4257#discussion_r845757331


##########
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala:
##########
@@ -434,6 +436,23 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     }
   }
 
+  // Currently, the user query is modified if it contains avg aggregate.
+  // modifiedLogicalPlan is created from modified query which can be used to derive sum or count
+  // columns from MV in case avg is not present.
+  private def getLogicalPlan(schemaWrapper: MVSchemaWrapper,
+      plan: ModularPlan): LogicalPlan = {
+    val queryOutput = plan.output

Review Comment:
   ```suggestion
       val outputCols = plan.output
   ```



##########
integration/spark/src/main/scala/org/apache/carbondata/view/MVCatalogInSpark.scala:
##########
@@ -172,7 +188,7 @@ case class MVCatalogInSpark(session: SparkSession)
         val modularPlan = SimpleModularizer.modularize(
             BirdcageOptimizer.execute(logicalPlan)).next().semiHarmonized
         val signature = modularPlan.signature
-        viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, null)
+        viewSchemas += MVSchemaWrapper(signature, null, logicalPlan, logicalPlan, null)

Review Comment:
   why sending logical plan twice? shouldn't it be modifiedLogicalPlan?



##########
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala:
##########
@@ -434,6 +436,23 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     }
   }
 
+  // Currently, the user query is modified if it contains avg aggregate.
+  // modifiedLogicalPlan is created from modified query which can be used to derive sum or count
+  // columns from MV in case avg is not present.
+  private def getLogicalPlan(schemaWrapper: MVSchemaWrapper,
+      plan: ModularPlan): LogicalPlan = {
+    val queryOutput = plan.output
+    val sumOrCountCol = queryOutput.find(x => x.sql.contains(CarbonCommonConstants.SUM + "(") ||

Review Comment:
   always the SQL will be in lowercase here?



##########
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/view/CarbonCreateMVCommand.scala:
##########
@@ -139,13 +139,42 @@ case class CarbonCreateMVCommand(
 
   override protected def opName: String = "CREATE MATERIALIZED VIEW"
 
+  def checkIfAvgAggregatePresent(logicalPlan: LogicalPlan): Boolean = {
+    if (logicalPlan.isInstanceOf[Aggregate]) {

Review Comment:
   instead of directly checking is instanceOf, it would be better to check with transform operations on plan



##########
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala:
##########
@@ -434,6 +436,23 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     }
   }
 
+  // Currently, the user query is modified if it contains avg aggregate.

Review Comment:
   don't use currently in comments, you can directly write what this function does.



##########
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala:
##########
@@ -434,6 +436,23 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
     }
   }
 
+  // Currently, the user query is modified if it contains avg aggregate.
+  // modifiedLogicalPlan is created from modified query which can be used to derive sum or count
+  // columns from MV in case avg is not present.

Review Comment:
   `columns from MV in case avg is not present.` this line is not clear, can you elaborate a bit and if possible can you write in a more clear way



##########
integration/spark/src/main/scala/org/apache/spark/sql/optimizer/MVRewrite.scala:
##########
@@ -800,7 +822,62 @@ class MVRewrite(catalog: MVCatalogInSpark, logicalPlan: LogicalPlan,
         val planWrapper = groupBy.modularPlan.get.asInstanceOf[MVPlanWrapper]
         val plan = planWrapper.modularPlan.asInstanceOf[Select]
         val updatedPlanOutputList = getUpdatedOutputList(plan.outputList, groupBy.modularPlan)
-        val outputListMapping = groupBy.outputList zip updatedPlanOutputList
+        var columnIndex = -1

Review Comment:
   please add comment why `columnIndex` is used



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@carbondata.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org