You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 12:14:52 UTC

carbondata git commit: [CARBONDATA-1596] Fixed IntermediateFileMerger for decimal types

Repository: carbondata
Updated Branches:
  refs/heads/master 87892522b -> fd28b1561


[CARBONDATA-1596] Fixed IntermediateFileMerger for decimal types

Analysis: casting bigdecimal to byte[] was throwing ClassCastException in IntermediateFileMerger.

Solution: Use DataType#bigDecimalToByte to convert bigdecimal to byte[].

This closes #1420


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd28b156
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd28b156
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd28b156

Branch: refs/heads/master
Commit: fd28b15610c1e048f4d07ec0af1905ef2bcf5e8f
Parents: 8789252
Author: kunal642 <ku...@gmail.com>
Authored: Thu Oct 19 11:56:37 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 8 20:14:30 2017 +0800

----------------------------------------------------------------------
 .../dataload/TestLoadDataGeneral.scala          | 27 ++++++++++++++++++++
 .../sort/sortdata/IntermediateFileMerger.java   |  5 +++-
 2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd28b156/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index a749f12..b90a5ea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -23,11 +23,15 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
 class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
@@ -147,8 +151,31 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
     sql("DROP TABLE load_test_singlepass")
   }
 
+  test("test load data with decimal type and sort intermediate files as 1") {
+    sql("drop table if exists carbon_table")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT, "1")
+      .addProperty(CarbonCommonConstants.SORT_SIZE, "1")
+      .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE, "1")
+    sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10)) STORED BY 'org.apache.carbondata.format'")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+        CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
+      .addProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+      .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE,
+        CarbonCommonConstants.DATA_LOAD_BATCH_SIZE_DEFAULT)
+    sql("drop table if exists carbon_table")
+  }
+
   override def afterAll {
     sql("DROP TABLE if exists loadtest")
     sql("drop table if exists invalidMeasures")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+        CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
+      .addProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+      .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE,
+        CarbonCommonConstants.DATA_LOAD_BATCH_SIZE_DEFAULT)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd28b156/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 266e69a..bc65026 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -23,6 +23,7 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.AbstractQueue;
 import java.util.PriorityQueue;
 import java.util.concurrent.Callable;
@@ -32,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.NonDictionaryUtil;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 
@@ -358,7 +360,8 @@ public class IntermediateFileMerger implements Callable<Void> {
           } else if (dataType == DataTypes.DOUBLE) {
             stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
           } else if (DataTypes.isDecimal(dataType)) {
-            byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+            byte[] bigDecimalInBytes = DataTypeUtil
+                .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
             stream.writeInt(bigDecimalInBytes.length);
             stream.write(bigDecimalInBytes);
           } else {