You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/11/08 12:14:52 UTC
carbondata git commit: [CARBONDATA-1596] Fixed IntermediateFileMerger
for decimal types
Repository: carbondata
Updated Branches:
refs/heads/master 87892522b -> fd28b1561
[CARBONDATA-1596] Fixed IntermediateFileMerger for decimal types
Analysis: casting bigdecimal to byte[] was throwing ClassCastException in IntermediateFileMerger.
Solution: Use DataType#bigDecimalToByte to convert bigdecimal to byte[].
This closes #1420
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fd28b156
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fd28b156
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fd28b156
Branch: refs/heads/master
Commit: fd28b15610c1e048f4d07ec0af1905ef2bcf5e8f
Parents: 8789252
Author: kunal642 <ku...@gmail.com>
Authored: Thu Oct 19 11:56:37 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Nov 8 20:14:30 2017 +0800
----------------------------------------------------------------------
.../dataload/TestLoadDataGeneral.scala | 27 ++++++++++++++++++++
.../sort/sortdata/IntermediateFileMerger.java | 5 +++-
2 files changed, 31 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd28b156/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index a749f12..b90a5ea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -23,11 +23,15 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.Row
import org.scalatest.BeforeAndAfterAll
+
import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.metadata.CarbonMetadata
import org.apache.spark.sql.test.util.QueryTest
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
@@ -147,8 +151,31 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterAll {
sql("DROP TABLE load_test_singlepass")
}
+ test("test load data with decimal type and sort intermediate files as 1") {
+ sql("drop table if exists carbon_table")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT, "1")
+ .addProperty(CarbonCommonConstants.SORT_SIZE, "1")
+ .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE, "1")
+ sql("create table if not exists carbonBigDecimal (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary decimal(27, 10)) STORED BY 'org.apache.carbondata.format'")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalBoundaryDataCarbon.csv' into table carbonBigDecimal")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+ CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
+ .addProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+ .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE,
+ CarbonCommonConstants.DATA_LOAD_BATCH_SIZE_DEFAULT)
+ sql("drop table if exists carbon_table")
+ }
+
override def afterAll {
sql("DROP TABLE if exists loadtest")
sql("drop table if exists invalidMeasures")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT,
+ CarbonCommonConstants.SORT_INTERMEDIATE_FILES_LIMIT_DEFAULT_VALUE)
+ .addProperty(CarbonCommonConstants.SORT_SIZE, CarbonCommonConstants.SORT_SIZE_DEFAULT_VAL)
+ .addProperty(CarbonCommonConstants.DATA_LOAD_BATCH_SIZE,
+ CarbonCommonConstants.DATA_LOAD_BATCH_SIZE_DEFAULT)
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fd28b156/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
index 266e69a..bc65026 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -23,6 +23,7 @@ import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.AbstractQueue;
import java.util.PriorityQueue;
import java.util.concurrent.Callable;
@@ -32,6 +33,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
import org.apache.carbondata.core.metadata.datatype.DataType;
import org.apache.carbondata.core.metadata.datatype.DataTypes;
import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
import org.apache.carbondata.core.util.NonDictionaryUtil;
import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
@@ -358,7 +360,8 @@ public class IntermediateFileMerger implements Callable<Void> {
} else if (dataType == DataTypes.DOUBLE) {
stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
} else if (DataTypes.isDecimal(dataType)) {
- byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+ byte[] bigDecimalInBytes = DataTypeUtil
+ .bigDecimalToByte((BigDecimal) NonDictionaryUtil.getMeasure(fieldIndex, row));
stream.writeInt(bigDecimalInBytes.length);
stream.write(bigDecimalInBytes);
} else {