You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/06 10:00:23 UTC
[06/20] incubator-carbondata git commit: [CARBONDATA-127] Decimal
datatype result in cast exception in compaction (#897)
[CARBONDATA-127] Decimal datatype result in cast exception in compaction (#897)
* in case of decimal measure we will get spark Decimal data type.
but in writer it expects byte array. so converting decimal to byte array.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/a7a5eb2c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/a7a5eb2c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/a7a5eb2c
Branch: refs/heads/master
Commit: a7a5eb2c69bc132f8a3b1bb65619c8f79a0186ac
Parents: ac8d866
Author: ravikiran23 <ra...@gmail.com>
Authored: Tue Aug 2 04:33:59 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Tue Aug 2 04:33:59 2016 +0530
----------------------------------------------------------------------
.../MajorCompactionStopsAfterCompaction.scala | 2 +-
.../testsuite/joinquery/EquiJoinTestCase.scala | 4 +++
.../store/CarbonFactDataHandlerColumnar.java | 28 +++++++++++++++++---
3 files changed, 30 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index 3cbf5dd..17f6bd3 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -23,7 +23,7 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
sql(
- "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID Int, date Timestamp, name " +
+ "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID decimal(7,4), date Timestamp, name " +
"String, " +
"phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
".format'"
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
index 3943352..37b8ebc 100644
--- a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/joinquery/EquiJoinTestCase.scala
@@ -7,6 +7,10 @@ import org.apache.spark.sql.execution.joins.BroadCastFilterPushJoin
class EquiJoinTestCase extends QueryTest with BeforeAndAfterAll {
override def beforeAll {
+ sql("drop table if exists employee_hive")
+ sql("drop table if exists mobile_hive")
+ sql("drop table if exists employee")
+ sql("drop table if exists mobile")
//loading to hive table
sql("create table employee_hive (empid string,empname string,mobilename string,mobilecolor string,salary int)row format delimited fields terminated by ','")
sql("create table mobile_hive (mobileid string,mobilename string, mobilecolor string, sales int)row format delimited fields terminated by ','");
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/a7a5eb2c/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
index 2a43d23..abb4d01 100644
--- a/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
+++ b/processing/src/main/java/org/carbondata/processing/store/CarbonFactDataHandlerColumnar.java
@@ -76,6 +76,8 @@ import org.carbondata.processing.store.writer.NodeHolder;
import org.carbondata.processing.store.writer.exception.CarbonDataWriterException;
import org.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.spark.sql.types.Decimal;
+
/**
* Fact data handler class to handle the fact data
*/
@@ -260,6 +262,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
* Segment properties
*/
private SegmentProperties segmentProperties;
+ /**
+ * flag to check for compaction flow
+ */
+ private boolean compactionFlow;
/**
* CarbonFactDataHandler constructor
@@ -353,7 +359,10 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
dimensionType =
CarbonUtil.identifyDimensionType(carbonTable.getDimensionByTableName(tableName));
- if (carbonFactDataHandlerModel.isCompactionFlow()) {
+ this.compactionFlow = carbonFactDataHandlerModel.isCompactionFlow();
+ // in compaction flow the measure with decimal type will come as spark decimal.
+ // need to convert it to byte array.
+ if (compactionFlow) {
try {
numberOfCores = Integer.parseInt(CarbonProperties.getInstance()
.getProperty(CarbonCommonConstants.NUM_CORES_COMPACTING,
@@ -562,7 +571,12 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
b = DataTypeUtil.bigDecimalToByte(val);
nullValueIndexBitSet[customMeasureIndex[i]].set(count);
} else {
- b = (byte[]) row[customMeasureIndex[i]];
+ if (this.compactionFlow) {
+ BigDecimal bigDecimal = ((Decimal) row[customMeasureIndex[i]]).toJavaBigDecimal();
+ b = DataTypeUtil.bigDecimalToByte(bigDecimal);
+ } else {
+ b = (byte[]) row[customMeasureIndex[i]];
+ }
}
byteBuffer = ByteBuffer.allocate(b.length + CarbonCommonConstants.INT_SIZE_IN_BYTE);
byteBuffer.putInt(b.length);
@@ -889,7 +903,15 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler {
int num = (value % 1 == 0) ? 0 : CarbonCommonConstants.CARBON_DECIMAL_POINTERS_DEFAULT;
decimal[count] = (decimal[count] > num ? decimal[count] : num);
} else if (type[count] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) {
- byte[] buff = (byte[]) row[count];
+ byte[] buff = null;
+ // in compaction flow the measure with decimal type will come as spark decimal.
+ // need to convert it to byte array.
+ if (this.compactionFlow) {
+ BigDecimal bigDecimal = ((Decimal) row[count]).toJavaBigDecimal();
+ buff = DataTypeUtil.bigDecimalToByte(bigDecimal);
+ } else {
+ buff = (byte[]) row[count];
+ }
BigDecimal value = DataTypeUtil.byteToBigDecimal(buff);
BigDecimal minVal = (BigDecimal) min[count];
min[count] = minVal.min(value);