You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:30:00 UTC
[54/56] [abbrv] carbondata git commit: fix single-pass issue for
partition table
fix single-pass issue for partition table
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b4cf704
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b4cf704
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b4cf704
Branch: refs/heads/streaming_ingest
Commit: 5b4cf704ea25f5edd438c1a9491350bff88aa5e0
Parents: 967f8ac
Author: QiangCai <da...@gmail.com>
Authored: Thu Jun 8 17:47:07 2017 +0800
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Jun 19 20:37:32 2017 +0530
----------------------------------------------------------------------
.../IncrementalColumnDictionaryGenerator.java | 23 ++++--
.../generator/ServerDictionaryGenerator.java | 6 +-
.../generator/TableDictionaryGenerator.java | 11 ++-
.../generator/TableDictionaryGeneratorTest.java | 4 +-
.../TestDataLoadingForPartitionTable.scala | 84 ++++++++++++++++++++
5 files changed, 116 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
index df49dc0..87c575f 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
@@ -67,9 +67,12 @@ public class IncrementalColumnDictionaryGenerator implements BiDictionary<Intege
private int currentDictionarySize;
+ private int maxValue;
+
private CarbonDimension dimension;
public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxValue) {
+ this.maxValue = maxValue;
this.currentDictionarySize = maxValue;
this.dimension = dimension;
}
@@ -169,14 +172,20 @@ public class IncrementalColumnDictionaryGenerator implements BiDictionary<Intege
}
// write value to dictionary file
if (reverseIncrementalCache.size() > 0) {
- for (int index = 2; index < reverseIncrementalCache.size() + 2; index++) {
- String value = reverseIncrementalCache.get(index);
- String parsedValue = DataTypeUtil
- .normalizeColumnValueForItsDataType(value, dimension);
- if (null != parsedValue) {
- dictionaryWriter.write(parsedValue);
- distinctValues.add(parsedValue);
+ synchronized (lock) {
+ // collect incremental dictionary
+ for (int index = maxValue + 1; index <= currentDictionarySize; index++) {
+ String value = reverseIncrementalCache.get(index);
+ String parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value, dimension);
+ if (null != parsedValue) {
+ dictionaryWriter.write(parsedValue);
+ distinctValues.add(parsedValue);
+ }
}
+ // clear incremental dictionary to avoid write to file again
+ reverseIncrementalCache.clear();
+ incrementalCache.clear();
+ currentDictionarySize = maxValue;
}
}
} finally {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index 456e885..6e0bc3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -52,7 +52,11 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, D
key.getTableUniqueName(), key.getColumnName());
// initialize TableDictionaryGenerator first
if (tableMap.get(key.getTableUniqueName()) == null) {
- tableMap.put(key.getTableUniqueName(), new TableDictionaryGenerator(dimension));
+ synchronized (tableMap) {
+ if (tableMap.get(key.getTableUniqueName()) == null) {
+ tableMap.put(key.getTableUniqueName(), new TableDictionaryGenerator(dimension));
+ }
+ }
} else {
tableMap.get(key.getTableUniqueName()).updateGenerator(dimension);
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index add7811..56ed7b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -115,7 +115,14 @@ public class TableDictionaryGenerator
}
public void updateGenerator(CarbonDimension dimension) {
- columnMap.put(dimension.getColumnId(),
- new IncrementalColumnDictionaryGenerator(dimension, 1));
+ // reuse dictionary generator
+ if (null == columnMap.get(dimension.getColumnId())) {
+ synchronized (columnMap) {
+ if (null == columnMap.get(dimension.getColumnId())) {
+ columnMap.put(dimension.getColumnId(),
+ new IncrementalColumnDictionaryGenerator(dimension, 1));
+ }
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
index c96a7d4..0cb47c4 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
@@ -65,8 +65,8 @@ public class TableDictionaryGeneratorTest {
empDimension = new CarbonDimension(empColumnSchema, 0, 0, 0, 0, 0);
ageColumnSchema = new ColumnSchema();
- ageColumnSchema.setColumnName("empNameCol");
- ageColumnSchema.setColumnUniqueId("empNameCol");
+ ageColumnSchema.setColumnName("ageNameCol");
+ ageColumnSchema.setColumnUniqueId("ageNameCol");
ageColumnSchema.setDimensionColumn(true);
ageColumnSchema.setEncodingList(Arrays.asList(Encoding.DICTIONARY));
ageDimension = new CarbonDimension(ageColumnSchema, 0, 0, 0, 0, 0);
http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index b1f4ba4..3adc0ed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -140,6 +140,67 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
}
+ test("single pass data loading for partition table: hash partition") {
+ sql(
+ """
+ | CREATE TABLE hashTableSinglePass (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE hashTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+ validateDataFiles("default_hashTableSinglePass", "0", Seq(0, 1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTableSinglePass order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("single pass data loading for partition table: range partition") {
+ sql(
+ """
+ | CREATE TABLE rangeTableSinglePass (empno int, empname String, designation String,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (doj Timestamp)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+ | 'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+ validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 3, 4))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableSinglePass order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+ }
+
+ test("single pass data loading for partition table: list partition") {
+ sql(
+ """
+ | CREATE TABLE listTableSinglePass (empno int, empname String, designation String, doj Timestamp,
+ | workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (workgroupcategory int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+ | 'LIST_INFO'='0, 1, (2, 3)')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+ validateDataFiles("default_listTableSinglePass", "0", Seq(1, 2))
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableSinglePass order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+ }
+
test("Insert into for partition table: hash partition") {
sql(
"""
@@ -220,6 +281,25 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
}
+ test("multiple single pass data loading for partition table") {
+ sql(
+ """
+ | CREATE TABLE multiLoadsSinglePass (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int)
+ | PARTITIONED BY (empno int)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+ checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiLoadsSinglePass order by empno"),
+ sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+ }
+
test("multiple insertInto for partition table") {
sql(
"""
@@ -309,6 +389,10 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
sql("drop table if exists loadAndInsert")
sql("drop table if exists listTableUpper")
sql("drop table if exists badrecordsPartition")
+ sql("drop table if exists hashTableSinglePass")
+ sql("drop table if exists rangeTableSinglePass")
+ sql("drop table if exists listTableSinglePass")
+ sql("drop table if exists multiLoadsSinglePass")
}
}