You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:30 UTC

[13/20] incubator-carbondata git commit: [CARBONDATA-138] Avg aggregation for decimal type keeping sync with hive (#900)

[CARBONDATA-138] Avg aggregation for decimal type keeping sync with hive (#900)

scale up the decimal value during average aggregation

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/07fe4d21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/07fe4d21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/07fe4d21

Branch: refs/heads/master
Commit: 07fe4d2132f62ffaa3eac5a67315ffb6183ee45a
Parents: a724831
Author: Gin-zhj <zh...@huawei.com>
Authored: Thu Aug 4 18:40:43 2016 +0800
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Thu Aug 4 16:10:43 2016 +0530

----------------------------------------------------------------------
 .../impl/AvgBigDecimalAggregator.java           | 12 ++++-
 .../carbondata/spark/agg/CarbonAggregates.scala | 12 +++--
 .../testsuite/bigdecimal/TestBigDecimal.scala   | 53 +++++++++++++++-----
 3 files changed, 59 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
index 8c67cfc..2c5e59b 100644
--- a/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
+++ b/core/src/main/java/org/carbondata/query/aggregator/impl/AvgBigDecimalAggregator.java
@@ -25,11 +25,15 @@ import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
 
+import static java.lang.Math.min;
+
 import org.carbondata.core.carbon.datastore.chunk.MeasureColumnDataChunk;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.util.DataTypeUtil;
 import org.carbondata.query.aggregator.MeasureAggregator;
 
+import org.apache.spark.sql.types.DecimalType;
+
 public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
 
   /**
@@ -115,7 +119,9 @@ public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
    * @return average aggregate value
    */
   @Override public BigDecimal getBigDecimalValue() {
-    return aggVal.divide(new BigDecimal(count), 6);
+    // increase scale to avoid any precision lost in the data
+    int updatedScale = min(aggVal.scale() + 4, DecimalType.MAX_SCALE());
+    return aggVal.divide(new BigDecimal(count), updatedScale, BigDecimal.ROUND_HALF_EVEN);
   }
 
   /**
@@ -139,7 +145,9 @@ public class AvgBigDecimalAggregator extends AbstractMeasureAggregatorBasic {
    * @return average value as an object
    */
   @Override public Object getValueObject() {
-    return aggVal.divide(new BigDecimal(count));
+    // increase scale to avoid any precision lost in the data
+    int updatedScale = min(aggVal.scale() + 4, DecimalType.MAX_SCALE());
+    return aggVal.divide(new BigDecimal(count), updatedScale, BigDecimal.ROUND_HALF_EVEN);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
index 50872ff..c3336b2 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/agg/CarbonAggregates.scala
@@ -17,9 +17,8 @@
 
 package org.carbondata.spark.agg
 
-import java.math.BigDecimal
-
 import scala.language.implicitConversions
+import scala.math.min
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -116,7 +115,14 @@ case class AverageCarbon(child: Expression, castedDataType: DataType = null)
       AverageCarbonFinal(partialSum.toAttribute,
         child.dataType match {
           case IntegerType | StringType | LongType | TimestampType => DoubleType
-          case _ => CarbonScalaUtil.updateDataType(child.dataType)
+          case decimal: DecimalType =>
+            val precision = decimal.asInstanceOf[DecimalType].precision
+            val scale = decimal.asInstanceOf[DecimalType].scale
+            // increase precision and scale to avoid any precision lost in the data
+            val updatedPrecision = min(precision + 4, DecimalType.MAX_PRECISION)
+            val updatedScale = min(scale + 4, DecimalType.MAX_SCALE)
+            DecimalType(updatedPrecision, updatedScale)
+          case _ => child.dataType
         }),
       partialSum :: Nil)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/07fe4d21/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
index 2f6b9f8..ab62277 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/bigdecimal/TestBigDecimal.scala
@@ -35,6 +35,7 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists carbonTable")
     sql("drop table if exists hiveTable")
     sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE, "1")
@@ -45,6 +46,8 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("LOAD DATA local inpath './src/test/resources/decimalDataWithoutHeader.csv' INTO table hiveTable")
     sql("create table if not exists hiveBigDecimal(ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10))row format delimited fields terminated by ','")
     sql("LOAD DATA local inpath './src/test/resources/decimalBoundaryDataHive.csv' INTO table hiveBigDecimal")
+    sql("create table if not exists carbonBigDecimal_2 (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
+    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal_2")
   }
 
   test("test detail query on big decimal column") {
@@ -139,32 +142,56 @@ class TestBigDecimal extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists decimalDictLookUp")
   }
 
-  test("test sum aggregation on big decimal column with high precision") {
-    sql("drop table if exists carbonBigDecimal")
-    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
-
-    checkAnswer(sql("select sum(salary)+10 from carbonBigDecimal"),
+  test("test sum+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)+10 from carbonBigDecimal_2"),
       sql("select sum(salary)+10 from hiveBigDecimal"))
+  }
 
-    sql("drop table if exists carbonBigDecimal")
+  test("test sum*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)*10 from carbonBigDecimal_2"),
+      sql("select sum(salary)*10 from hiveBigDecimal"))
   }
 
-  test("test sum-distinct aggregation on big decimal column with high precision") {
-    sql("drop table if exists carbonBigDecimal")
-    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(30, 10)) STORED BY 'org.apache.carbondata.format'")
-    sql("LOAD DATA LOCAL INPATH './src/test/resources/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
+  test("test sum/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(salary)/10 from carbonBigDecimal_2"),
+      sql("select sum(salary)/10 from hiveBigDecimal"))
+  }
 
-    checkAnswer(sql("select sum(distinct(salary))+10 from carbonBigDecimal"),
+  test("test sum-distinct+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))+10 from carbonBigDecimal_2"),
       sql("select sum(distinct(salary))+10 from hiveBigDecimal"))
+  }
 
-    sql("drop table if exists carbonBigDecimal")
+  test("test sum-distinct*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))*10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))*10 from hiveBigDecimal"))
+  }
+
+  test("test sum-distinct/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select sum(distinct(salary))/10 from carbonBigDecimal_2"),
+      sql("select sum(distinct(salary))/10 from hiveBigDecimal"))
+  }
+
+  test("test avg+10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)+10 from carbonBigDecimal_2"),
+      sql("select avg(salary)+10 from hiveBigDecimal"))
+  }
+
+  test("test avg*10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)*10 from carbonBigDecimal_2"),
+      sql("select avg(salary)*10 from hiveBigDecimal"))
+  }
+
+  test("test avg/10 aggregation on big decimal column with high precision") {
+    checkAnswer(sql("select avg(salary)/10 from carbonBigDecimal_2"),
+      sql("select avg(salary)/10 from hiveBigDecimal"))
   }
 
   override def afterAll {
     sql("drop table if exists carbonTable")
     sql("drop table if exists hiveTable")
     sql("drop table if exists hiveBigDecimal")
+    sql("drop table if exists carbonBigDecimal_2")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants.SORT_SIZE,