You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/12 15:04:47 UTC
[11/50] [abbrv] carbondata git commit: fix unsafe column page bug
fix unsafe column page bug
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/fdb672ad
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/fdb672ad
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/fdb672ad
Branch: refs/heads/datamap
Commit: fdb672ad946c0fe5b9982aee9b09717db36a54f7
Parents: ad80006
Author: jackylk <ja...@huawei.com>
Authored: Fri Jun 30 18:27:08 2017 +0800
Committer: QiangCai <qi...@qq.com>
Committed: Sat Jul 1 13:09:24 2017 +0800
----------------------------------------------------------------------
.../page/UnsafeVarLengthColumnPage.java | 35 ++++++++++++++++----
.../datastore/page/VarLengthColumnPageBase.java | 3 +-
.../resources/big_decimal_without_header.csv | 5 +++
.../TestLoadDataWithHiveSyntaxUnsafe.scala | 25 +++++++++++++-
4 files changed, 59 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
index 75b5312..dd6abc5 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/UnsafeVarLengthColumnPage.java
@@ -47,6 +47,11 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
private static final double FACTOR = 1.25;
+ /**
+ * create a page
+ * @param dataType data type
+ * @param pageSize number of row
+ */
UnsafeVarLengthColumnPage(DataType dataType, int pageSize) throws MemoryException {
super(dataType, pageSize);
capacity = (int) (pageSize * DEFAULT_ROW_SIZE * FACTOR);
@@ -55,6 +60,20 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
baseOffset = memoryBlock.getBaseOffset();
}
+ /**
+ * create a page with initial capacity
+ * @param dataType data type
+ * @param pageSize number of row
+ * @param capacity initial capacity of the page, in bytes
+ */
+ UnsafeVarLengthColumnPage(DataType dataType, int pageSize, int capacity) throws MemoryException {
+ super(dataType, pageSize);
+ this.capacity = capacity;
+ memoryBlock = UnsafeMemoryManager.allocateMemoryWithRetry((long)(capacity));
+ baseAddress = memoryBlock.getBaseObject();
+ baseOffset = memoryBlock.getBaseOffset();
+ }
+
@Override
public void freeMemory() {
if (memoryBlock != null) {
@@ -65,6 +84,9 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
}
}
+ /**
+ * reallocate memory if capacity length than current size + request size
+ */
private void ensureMemory(int requestSize) throws MemoryException {
if (totalLength + requestSize > capacity) {
int newSize = 2 * capacity;
@@ -81,17 +103,16 @@ public class UnsafeVarLengthColumnPage extends VarLengthColumnPageBase {
@Override
public void putBytesAtRow(int rowId, byte[] bytes) {
- try {
- ensureMemory(bytes.length);
- } catch (MemoryException e) {
- throw new RuntimeException(e);
- }
- CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET,
- baseAddress, baseOffset + rowOffset[rowId], bytes.length);
+ putBytes(rowId, bytes, 0, bytes.length);
}
@Override
public void putBytes(int rowId, byte[] bytes, int offset, int length) {
+ try {
+ ensureMemory(length);
+ } catch (MemoryException e) {
+ throw new RuntimeException(e);
+ }
CarbonUnsafe.unsafe.copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET + offset,
baseAddress, baseOffset + rowOffset[rowId], length);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
index a897d54..801cfb3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/page/VarLengthColumnPageBase.java
@@ -105,8 +105,9 @@ public abstract class VarLengthColumnPageBase extends ColumnPage {
int numRows = rowId;
VarLengthColumnPageBase page;
+ int inputDataLength = offset;
if (unsafe) {
- page = new UnsafeVarLengthColumnPage(DECIMAL, numRows);
+ page = new UnsafeVarLengthColumnPage(DECIMAL, numRows, inputDataLength);
} else {
page = new SafeVarLengthColumnPage(DECIMAL, numRows);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
new file mode 100644
index 0000000..4e99384
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/big_decimal_without_header.csv
@@ -0,0 +1,5 @@
+1,32473289848372638424.8218378712
+2,99487323423232324232.2434323233
+3,12773443434389239382.4309238238
+4,38488747823423323726.3589238237
+5,93838663748166353423.4273832762
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/fdb672ad/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
index 2a9d1d9..c713865 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxUnsafe.scala
@@ -65,6 +65,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
sql("drop table if exists comment_test")
sql("drop table if exists smallinttable")
sql("drop table if exists smallinthivetable")
+ sql("drop table if exists decimal_varlength")
+ sql("drop table if exists decimal_varlength_hive")
sql(
"CREATE table carbontable (empno int, empname String, designation String, doj String, " +
"workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +
@@ -77,7 +79,18 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
"projectcode int, projectjoindate String,projectenddate String, attendance String," +
"utilization String,salary String)row format delimited fields terminated by ','"
)
-
+ sql(
+ """
+ | CREATE TABLE decimal_varlength(id string, value decimal(30,10))
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin
+ )
+ sql(
+ """
+ | CREATE TABLE decimal_varlength_hive(id string, value decimal(30,10))
+ | ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
+ """.stripMargin
+ )
}
test("create table with smallint type and query smallint table") {
@@ -674,6 +687,14 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
Row("~carbon,")))
}
+ test("test decimal var lenght comlumn page") {
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE decimal_varlength" +
+ s" OPTIONS('FILEHEADER'='id,value')")
+ sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/big_decimal_without_header.csv' INTO TABLE decimal_varlength_hive")
+ checkAnswer(sql("select value from decimal_varlength"), sql("select value from decimal_varlength_hive"))
+ checkAnswer(sql("select sum(value) from decimal_varlength"), sql("select sum(value) from decimal_varlength_hive"))
+ }
+
override def afterAll {
sql("drop table if exists escapechar1")
sql("drop table if exists escapechar2")
@@ -701,6 +722,8 @@ class TestLoadDataWithHiveSyntaxUnsafe extends QueryTest with BeforeAndAfterAll
sql("drop table if exists carbontable1")
sql("drop table if exists hivetable1")
sql("drop table if exists comment_test")
+ sql("drop table if exists decimal_varlength")
+ sql("drop table if exists decimal_varlength_hive")
CarbonProperties.getInstance().addProperty(
CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING,
CarbonCommonConstants.ENABLE_UNSAFE_COLUMN_PAGE_LOADING_DEFAULT