You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/11/28 12:53:26 UTC

carbondata git commit: [CARBONDATA-3136] Fix JVM crash with preaggregate datamap when average of decimal column is taken with orderby

Repository: carbondata
Updated Branches:
  refs/heads/master c5bfe4acf -> afe2b669b


[CARBONDATA-3136] Fix JVM crash with preaggregate datamap when average of decimal column is taken with orderby

problem: JVM crash with preaggregate datamap when average of decimal column is taken with orderby.

cause: When preparing plan with preaggregate datamap, decimal is cast to double in average expression. This was leading to JVM crash in spark as we were filling with wrong precision (callstack mentioned in JIRA)

solution: division result of average, should be casted to decimal instead of double for decimal datatype.

This closes #2958


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/afe2b669
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/afe2b669
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/afe2b669

Branch: refs/heads/master
Commit: afe2b669bdab68f1528bc861e72480b1b37509a3
Parents: c5bfe4a
Author: ajantha-bhat <aj...@gmail.com>
Authored: Tue Nov 27 19:37:49 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Wed Nov 28 18:23:17 2018 +0530

----------------------------------------------------------------------
 .../TestPreAggregateExpressions.scala           | 11 +++++++++
 .../sql/hive/CarbonPreAggregateRules.scala      | 26 +++++++++++++++-----
 2 files changed, 31 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe2b669/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
index b3b71a6..a7511fd 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggregateExpressions.scala
@@ -164,6 +164,17 @@ class TestPreAggregateExpressions extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("Test Pre_aggregate with decimal column with order by") {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, decimal_col decimal(30,16)) stored by 'carbondata'")
+    sql("insert into table maintable select 'abc',452.564")
+    sql(
+      "create datamap ag1 on table maintable using 'preaggregate' as select name,avg(decimal_col)" +
+      " from maintable group by name")
+    checkAnswer(sql("select avg(decimal_col) from maintable group by name order by name"),
+      Seq(Row(452.56400000000000000000)))
+  }
+
   override def afterAll: Unit = {
     sql("DROP TABLE IF EXISTS mainTable")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/afe2b669/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
index 76ff41a..9b204f8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonPreAggregateRules.scala
@@ -989,8 +989,15 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
         newExp
       case Average(exp: Expression) =>
-        val newExp = Seq(AggregateExpression(Sum(Cast(exp, DoubleType)), aggExp.mode, false),
-          Cast(AggregateExpression(Count(exp), aggExp.mode, false), DoubleType))
+        val dataType =
+          if (exp.dataType.isInstanceOf[DecimalType]) {
+            // decimal must not go as double precision.
+            exp.dataType.asInstanceOf[DecimalType]
+          } else {
+            DoubleType
+          }
+        val newExp = Seq(AggregateExpression(Sum(Cast(exp, dataType)), aggExp.mode, false),
+          Cast(AggregateExpression(Count(exp), aggExp.mode, false), dataType))
         newExp
       case _ =>
         val newExp = Seq(aggExp)
@@ -1663,6 +1670,13 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
               false))
         }
       case Average(exp: Expression) =>
+        val dataType =
+          if (exp.dataType.isInstanceOf[DecimalType]) {
+            // decimal must not go as double precision.
+            exp.dataType.asInstanceOf[DecimalType]
+          } else {
+            DoubleType
+          }
         // for handling Normal table case/Aggregate node added in case of streaming table
         if (!isStreamingTable) {
           // In case of average aggregate function select 2 columns from aggregate table
@@ -1670,24 +1684,24 @@ case class CarbonPreAggregateQueryRules(sparkSession: SparkSession) extends Rule
           // Then add divide(sum(column with sum), sum(column with count)).
           Seq(Divide(AggregateExpression(Sum(Cast(
             attrs.head,
-            DoubleType)),
+            dataType)),
             aggExp.mode,
             false),
             AggregateExpression(Sum(Cast(
               attrs.last,
-              DoubleType)),
+              dataType)),
               aggExp.mode,
               false)))
         } else {
           // in case of streaming aggregate table return two aggregate function sum and count
           Seq(AggregateExpression(Sum(Cast(
             attrs.head,
-            DoubleType)),
+            dataType)),
             aggExp.mode,
             false),
             AggregateExpression(Sum(Cast(
               attrs.last,
-              DoubleType)),
+              dataType)),
               aggExp.mode,
               false))
         }