You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:23 UTC

[06/20] incubator-carbondata git commit: [CARBONDATA-127] Decimal datatype result in cast exception in compaction (#897)

[CARBONDATA-127] Decimal datatype result in cast exception in compaction (#897)

* in case of decimal measure we will get spark Decimal data type.
but in writer it expects byte array. so converting decimal to byte array.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a7a5eb2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a7a5eb2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a7a5eb2c

Branch: refs/heads/master
Commit: a7a5eb2c69bc132f8a3b1bb65619c8f79a0186ac
Parents: ac8d866
Author: ravikiran23 <ra...@gmail.com>
Authored: Tue Aug 2 04:33:59 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Tue Aug 2 04:33:59 2016 +0530

----------------------------------------------------------------------
 .../MajorCompactionStopsAfterCompaction.scala   |  2 +-
 .../testsuite/joinquery/EquiJoinTestCase.scala  |  4 +++
 .../store/CarbonFactDataHandlerColumnar.java    | 28 +++++++++++++++++---
 3 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3cbf5dd..17f6bd3 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -23,7 +23,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
     sql(
-      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID Int, date Timestamp, name " +
+      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID decimal(7,4), date Timestamp, name " +
         "String, " +
         "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
         ".format'"

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
index 3943352..37b8ebc 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
@@ -7,6 +7,10 @@ import org.apache.spark.sql.execution.joins.BroadCastFilterPushJoin
 
 class EquiJoinTestCase extends QueryTest with BeforeAndAfterAll  {
    override def beforeAll {
+    sql("drop table if exists employee_hive")
+    sql("drop table if exists mobile_hive")
+    sql("drop table if exists employee")
+    sql("drop table if exists mobile")
     //loading to hive table
     sql("create table employee_hive (empid string,empname string,mobilename string,mobilecolor string,salary int)row format delimited fields terminated by ','")
     sql("create table mobile_hive (mobileid string,mobilename string, mobilecolor string, sales int)row format delimited fields terminated by ','");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2a43d23..abb4d01 100644
--- a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -76,6 +76,8 @@ import org.carbondata.processing.store.writer.NodeHolder;
 import org.carbondata.processing.store.writer.exception.CarbonDataWriterException;
 import org.carbondata.processing.util.RemoveDictionaryUtil;
 
+import org.apache.spark.sql.types.Decimal;
+
 /**
  * Fact data handler class to handle the fact data
  */
@@ -260,6 +262,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
    * Segment properties
    */
   private SegmentProperties segmentProperties;
+  /**
+   * flag to check for compaction flow
+   */
+  private boolean compactionFlow;
 
   /**
    * CarbonFactDataHandler constructor
@@ -353,7 +359,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
     dimensionType =
         CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
 
-    if (carbonFactDataHandlerModel.isCompactionFlow()) {
+    this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
+    // in compaction flow the measure with decimal type will come as spark decimal.
+    // need to convert it to byte array.
+    if (compactionFlow) {
       try {
         numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
@@ -562,7 +571,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           b = DataTypeUtil.bigDecimalToByte(val);
           nullValueIndexBitSet[customMeasureIndex[i]].set(count);
         } else {
-          b = (byte[]) row[customMeasureIndex[i]];
+          if (this.compactionFlow) {
+            BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+            b = DataTypeUtil.bigDecimalToByte(bigDecimal);
+          } else {
+            b = (byte[]) row[customMeasureIndex[i]];
+          }
         }
         byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
         byteBuffer.putInt(b.length);
@@ -889,7 +903,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
           int num = (value % 1 == 0) ? 0 : CarbonCommonConstants.CARBON_DECIMAL_POINTERS_DEFAULT;
           decimal[count] = (decimal[count] > num ? decimal[count] : num);
         } else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
-          byte[] buff = (byte[]) row[count];
+          byte[] buff = null;
+          // in compaction flow the measure with decimal type will come as spark decimal.
+          // need to convert it to byte array.
+          if (this.compactionFlow) {
+            BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
+            buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
+          } else {
+            buff = (byte[]) row[count];
+          }
           BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
           BigDecimal minVal = (BigDecimal) min[count];
           min[count] = minVal.min(value);