You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/16 12:45:34 UTC

carbondata git commit: [CARBONDATA-1901]Fixed Pre aggregate data map creation and query parsing

Repository: carbondata
Updated Branches:
  refs/heads/master e7ec6f43c -> 363731c62


[CARBONDATA-1901]Fixed Pre aggregate data map creation and query parsing

Problem:*Fixed below issues in case of pre aggregate

Pre aggregate data map table column order is not as per query given by user because of which while data is loaded to wrong column
when aggregate function contains any expression query is failing with match error
pre aggregate data map columns and parent tables columns encoder is not matching
Solution:
Do not consider group columns in pre aggregate
when aggregate function contains any expression hit the maintable
Get encoder from main table and add in pre aggregate table column
When aggregation type is sum or avg create measure column

This closes #1671


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/363731c6
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/363731c6
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/363731c6

Branch: refs/heads/master
Commit: 363731c62ba87259f80e4fe2e9ae91a354d13873
Parents: e7ec6f4
Author: kumarvishal <ku...@gmail.com>
Authored: Thu Dec 14 16:07:11 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Dec 16 18:15:21 2017 +0530

----------------------------------------------------------------------
 .../preaggregate/TestPreAggCreateCommand.scala  | 108 ++++++++++++++++---
 .../TestPreAggregateTableSelection.scala        |  22 ++--
 .../command/carbonTableSchemaCommon.scala       |  29 ++++-
 .../preaaggregate/PreAggregateUtil.scala        |  11 --
 .../sql/hive/CarbonPreAggregateRules.scala      |  12 ++-
 5 files changed, 134 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/363731c6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 7c06634..cb72732 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -1,7 +1,15 @@
 package org.apache.carbondata.integration.spark.testsuite.preaggregate
 
+import org.apache.spark.sql.CarbonDatasourceHadoopRelation
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.metadata.encoder.Encoding
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -9,34 +17,37 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")
+    sql("drop table if exists maintable")
     sql("create table preaggMain (a string, b string, c string) stored by 'carbondata'")
     sql("create table preaggMain1 (a string, b string, c string) stored by 'carbondata' tblProperties('DICTIONARY_INCLUDE' = 'a')")
     sql("create table preaggMain2 (a string, b string, c string) stored by 'carbondata'")
+    sql("create table maintable (column1 int, column6 string, column5 string, column2 string, column3 int, column4 int) stored by 'carbondata' tblproperties('dictionary_include'='column1,column6', 'dictionary_exclude'='column3,column5')")
+
   }
 
 
-  test("test pre agg create table One") {
+  test("test pre agg create table 1") {
     sql("create datamap preagg1 on table PreAggMain using 'preaggregate' as select a,sum(b) from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg1"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg1"), true, "preaggmain_b_sum")
     sql("drop datamap preagg1 on table PreAggMain")
   }
 
-  test("test pre agg create table Two") {
+  test("test pre agg create table 2") {
     sql("create datamap preagg2 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg2"), true, "preaggmain_b_sum")
     sql("drop datamap preagg2 on table PreAggMain")
   }
 
-  test("test pre agg create table Three") {
+  test("test pre agg create table 3") {
     sql("create datamap preagg3 on table PreAggMain using 'preaggregate' as select a,sum(b) as sum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg3"), true, "preaggmain_b_sum")
     sql("drop datamap preagg3 on table PreAggMain")
   }
 
-  test("test pre agg create table four") {
+  test("test pre agg create table 4") {
     sql("create datamap preagg4 on table PreAggMain using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain_preagg4"), true, "preaggmain_b_sum")
@@ -44,7 +55,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  test("test pre agg create table five") {
+  test("test pre agg create table 5") {
     sql("create datamap preagg11 on table PreAggMain1 using 'preaggregate'as select a,sum(b) from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg11"), true, "preaggmain1_b_sum")
@@ -52,7 +63,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg11 on table PreAggMain1")
   }
 
-  test("test pre agg create table six") {
+  test("test pre agg create table 6") {
     sql("create datamap preagg12 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg12"), true, "preaggmain1_b_sum")
@@ -60,7 +71,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg12 on table PreAggMain1")
   }
 
-  test("test pre agg create table seven") {
+  test("test pre agg create table 7") {
     sql("create datamap preagg13 on table PreAggMain1 using 'preaggregate' as select a,sum(b) as sum from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg13"), true, "preaggmain1_b_sum")
@@ -68,7 +79,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg13 on table PreAggMain1")
   }
 
-  test("test pre agg create table eight") {
+  test("test pre agg create table 8") {
     sql("create datamap preagg14 on table PreAggMain1 using 'preaggregate' as select a as a1,sum(b) as sum from PreAggMain1 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain1_preagg14"), true, "preaggmain1_b_sum")
@@ -77,7 +88,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   }
 
 
-  test("test pre agg create table nine") {
+  test("test pre agg create table 9") {
     sql("create datamap preagg15 on table PreAggMain2 using 'preaggregate' as select a,avg(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg15"), true, "preaggmain2_b_sum")
@@ -85,28 +96,28 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     sql("drop datamap preagg15 on table PreAggMain2")
   }
 
-  test("test pre agg create table ten") {
+  test("test pre agg create table 10") {
     sql("create datamap preagg16 on table PreAggMain2 using 'preaggregate' as select a as a1,max(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg16"), true, "preaggmain2_b_max")
     sql("drop datamap preagg16 on table PreAggMain2")
   }
 
-  test("test pre agg create table eleven") {
+  test("test pre agg create table 11") {
     sql("create datamap preagg17 on table PreAggMain2 using 'preaggregate' as select a,min(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg17"), true, "preaggmain2_b_min")
     sql("drop datamap preagg17 on table PreAggMain2")
   }
 
-  test("test pre agg create table twelve") {
+  test("test pre agg create table 12") {
     sql("create datamap preagg18 on table PreAggMain2 using 'preaggregate' as select a as a1,count(b) from PreAggMain2 group by a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_a")
     checkExistence(sql("DESCRIBE FORMATTED PreAggMain2_preagg18"), true, "preaggmain2_b_count")
     sql("drop datamap preagg18 on table PreAggMain2")
   }
 
-  test("test pre agg create table thirteen") {
+  test("test pre agg create table 13") {
     try {
       sql(
         "create datamap preagg19 on table PreAggMain2 using 'preaggregate' as select a as a1,count(distinct b) from PreAggMain2 group by a")
@@ -117,7 +128,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test pre agg create table fourteen") {
+  test("test pre agg create table 14") {
     try {
       sql(
         "create datamap preagg20 on table PreAggMain2 using 'preaggregate' as select a as a1,sum(distinct b) from PreAggMain2 group by a")
@@ -128,7 +139,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
-  test("test pre agg create table fifteen") {
+  test("test pre agg create table 15") {
     try {
       sql(
         "create datamap preagg21 on table PreAggMain2 using 'preaggregate' as select a as a1,sum(b) from PreAggMain2 where a='vishal' group by a")
@@ -139,8 +150,75 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test pre agg create table 16") {
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select column4, sum(column4) from maintable group by column4")
+    val df = sql("select * from maintable_agg0")
+    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    assert(carbontable.getAllMeasures.size()==2)
+    assert(carbontable.getAllDimensions.size()==0)
+    sql("drop datamap agg0 on table maintable")
+  }
+
+  test("test pre agg create table 17") {
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select column1, sum(column1),column6, sum(column6) from maintable group by column6,column1")
+    val df = sql("select * from maintable_agg0")
+    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    assert(carbontable.getAllMeasures.size()==2)
+    assert(carbontable.getAllDimensions.size()==2)
+    carbontable.getAllDimensions.asScala.foreach{ f =>
+      assert(f.getEncoder.contains(Encoding.DICTIONARY))
+    }
+    sql("drop datamap agg0 on table maintable")
+  }
+
+  test("test pre agg create table 18") {
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select column1, count(column1),column6, count(column6) from maintable group by column6,column1")
+    val df = sql("select * from maintable_agg0")
+    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    assert(carbontable.getAllMeasures.size()==1)
+    assert(carbontable.getAllDimensions.size()==4)
+    carbontable.getAllDimensions.asScala.foreach{ f =>
+      assert(f.getEncoder.contains(Encoding.DICTIONARY))
+    }
+    sql("drop datamap agg0 on table maintable")
+  }
+
+  test("test pre agg create table 19") {
+    sql("create datamap agg0 on table mainTable using 'preaggregate' as select column3, sum(column3),column5, sum(column5) from maintable group by column3,column5")
+    val df = sql("select * from maintable_agg0")
+    val carbontable = getCarbontable(df.queryExecution.analyzed)
+    assert(carbontable.getAllMeasures.size()==2)
+    assert(carbontable.getAllDimensions.size()==2)
+    carbontable.getAllDimensions.asScala.foreach{ f =>
+      assert(!f.getEncoder.contains(Encoding.DICTIONARY))
+    }
+    sql("drop datamap agg0 on table maintable")
+  }
+
+
+  def getCarbontable(plan: LogicalPlan) : CarbonTable ={
+    var carbonTable : CarbonTable = null
+    plan.transform {
+      // first check if any preaTable1 scala function is applied it is present is in plan
+      // then call is from create preaTable1regate table class so no need to transform the query plan
+      case ca:CarbonRelation =>
+        if (ca.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = ca.asInstanceOf[CarbonDatasourceHadoopRelation]
+          carbonTable = relation.carbonTable
+        }
+        ca
+      case logicalRelation:LogicalRelation =>
+        if(logicalRelation.relation.isInstanceOf[CarbonDatasourceHadoopRelation]) {
+          val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
+          carbonTable = relation.carbonTable
+        }
+        logicalRelation
+    }
+    carbonTable
+  }
 
   override def afterAll {
+    sql("drop table if exists maintable")
     sql("drop table if exists PreAggMain")
     sql("drop table if exists PreAggMain1")
     sql("drop table if exists PreAggMain2")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/363731c6/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
index d84ec3b..5b36826 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateTableSelection.scala
@@ -27,14 +27,6 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("drop table if exists agg0")
-    sql("drop table if exists agg1")
-    sql("drop table if exists agg2")
-    sql("drop table if exists agg3")
-    sql("drop table if exists agg4")
-    sql("drop table if exists agg5")
-    sql("drop table if exists agg6")
-    sql("drop table if exists agg7")
     sql("drop table if exists lineitem")
     sql("CREATE TABLE mainTable(id int, name string, city string, age string) STORED BY 'org.apache.carbondata.format'")
     sql("create datamap agg0 on table mainTable using 'preaggregate' as select name from mainTable group by name")
@@ -191,6 +183,11 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
     preAggTableValidator(df.queryExecution.analyzed, "maintable_agg0")
   }
 
+  test("test PreAggregate table selection 28") {
+    val df = sql("select name as NewName, sum(case when age=2016 then 1 else 0 end) as sum from mainTable group by name")
+    preAggTableValidator(df.queryExecution.analyzed, "maintable")
+  }
+
 
   def preAggTableValidator(plan: LogicalPlan, actualTableName: String) : Unit ={
     var isValidPlan = false
@@ -223,14 +220,7 @@ class TestPreAggregateTableSelection extends QueryTest with BeforeAndAfterAll {
 
   override def afterAll: Unit = {
     sql("drop table if exists mainTable")
-    sql("drop table if exists agg0")
-    sql("drop table if exists agg1")
-    sql("drop table if exists agg2")
-    sql("drop table if exists agg3")
-    sql("drop table if exists agg4")
-    sql("drop table if exists agg5")
-    sql("drop table if exists agg6")
-    sql("drop table if exists agg7")
+    sql("drop table if exists lineitem")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/363731c6/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 6a109f4..5b93907 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -514,17 +514,40 @@ class TableNewProcessor(cm: TableModel) {
     }
 
     cm.msrCols.foreach { field =>
-      val encoders = new java.util.ArrayList[Encoding]()
+      // if aggregate function is defined in case of preaggregate and agg function is sum or avg
+      // then it can be stored as measure
+      var isAggFunPresent = false
+      // getting the encoder from maintable so whatever encoding is applied in maintable
+      // same encoder can be applied on aggregate table
+      val encoders = if (cm.parentTable.isDefined && cm.dataMapRelation.get.get(field).isDefined) {
+        isAggFunPresent =
+          cm.dataMapRelation.get.get(field).get.aggregateFunction.equalsIgnoreCase("sum") ||
+          cm.dataMapRelation.get.get(field).get.aggregateFunction.equals("avg")
+        if(!isAggFunPresent) {
+          cm.parentTable.get.getColumnByName(
+            cm.parentTable.get.getTableName,
+            cm.dataMapRelation.get.get(field).get.columnTableRelation.get.parentColumnName)
+            .getEncoder
+        } else {
+          new java.util.ArrayList[Encoding]()
+        }
+      } else {
+        new java.util.ArrayList[Encoding]()
+      }
+      // check if it can be dimension column
+      val isDimColumn = !encoders.isEmpty && !isAggFunPresent
       val columnSchema = getColumnSchema(
         DataTypeConverterUtil.convertToCarbonType(field.dataType.getOrElse("")),
         field.name.getOrElse(field.column),
         encoders,
-        false,
+        isDimColumn,
         field,
         cm.dataMapRelation)
       allColumns :+= columnSchema
       index = index + 1
-      measureCount += 1
+      if (!isDimColumn) {
+        measureCount += 1
+      }
     }
 
     // Check if there is any duplicate measures or dimensions.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/363731c6/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index 6b24f46..8614d66 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -116,17 +116,6 @@ object PreAggregateUtil {
       throw new MalformedCarbonCommandException(
         "Pre Aggregation is not supported on Pre-Aggregated Table")
     }
-    groupByExp.map {
-      case attr: AttributeReference =>
-        fieldToDataMapFieldMap += getField(attr.name,
-          attr.dataType,
-          parentColumnId = carbonTable.getColumnByName(parentTableName, attr.name).getColumnId,
-          parentTableName = parentTableName,
-          parentDatabaseName = parentDatabaseName, parentTableId = parentTableId)
-      case _ =>
-        throw new MalformedCarbonCommandException(s"Unsupported Function in select Statement:${
-          selectStmt } ")
-    }
     aggExp.map {
       case Alias(attr: AggregateExpression, _) =>
         if (attr.isDistinct) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/363731c6/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 19cc711..623d309 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -148,7 +148,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             carbonTable,
             tableName,
             list)
-          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          if(isValidPlan) {
+            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          }
           // getting the columns from filter expression
           if(isValidPlan) {
             isValidPlan = extractQueryColumnFromFilterExp(filterExp, list, carbonTable, tableName)
@@ -208,7 +210,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             carbonTable,
             tableName,
             list)
-          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          if(isValidPlan) {
+            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          }
           if (isValidPlan) {
             list ++
             extractQueryColumnForOrderBy(Some(projectList), sortOrders, carbonTable, tableName)
@@ -251,7 +255,9 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
             carbonTable,
             tableName,
             list)
-          isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          if(isValidPlan) {
+            isValidPlan = !CarbonReflectionUtils.hasPredicateSubquery(filterExp)
+          }
           if(isValidPlan) {
             list ++ extractQueryColumnForOrderBy(sortOrders = sortOrders,
               carbonTable = carbonTable,