You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/06/20 07:30:00 UTC

[54/56] [abbrv] carbondata git commit: fix single-pass issue for partition table

fix single-pass issue for partition table


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b4cf704
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b4cf704
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b4cf704

Branch: refs/heads/streaming_ingest
Commit: 5b4cf704ea25f5edd438c1a9491350bff88aa5e0
Parents: 967f8ac
Author: QiangCai <da...@gmail.com>
Authored: Thu Jun 8 17:47:07 2017 +0800
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Jun 19 20:37:32 2017 +0530

----------------------------------------------------------------------
 .../IncrementalColumnDictionaryGenerator.java   | 23 ++++--
 .../generator/ServerDictionaryGenerator.java    |  6 +-
 .../generator/TableDictionaryGenerator.java     | 11 ++-
 .../generator/TableDictionaryGeneratorTest.java |  4 +-
 .../TestDataLoadingForPartitionTable.scala      | 84 ++++++++++++++++++++
 5 files changed, 116 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
index df49dc0..87c575f 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/IncrementalColumnDictionaryGenerator.java
@@ -67,9 +67,12 @@ public class IncrementalColumnDictionaryGenerator implements BiDictionary<Intege
 
   private int currentDictionarySize;
 
+  private int maxValue;
+
   private CarbonDimension dimension;
 
   public IncrementalColumnDictionaryGenerator(CarbonDimension dimension, int maxValue) {
+    this.maxValue = maxValue;
     this.currentDictionarySize = maxValue;
     this.dimension = dimension;
   }
@@ -169,14 +172,20 @@ public class IncrementalColumnDictionaryGenerator implements BiDictionary<Intege
       }
       // write value to dictionary file
       if (reverseIncrementalCache.size() > 0) {
-        for (int index = 2; index < reverseIncrementalCache.size() + 2; index++) {
-          String value = reverseIncrementalCache.get(index);
-          String parsedValue = DataTypeUtil
-                  .normalizeColumnValueForItsDataType(value, dimension);
-          if (null != parsedValue) {
-            dictionaryWriter.write(parsedValue);
-            distinctValues.add(parsedValue);
+        synchronized (lock) {
+          // collect incremental dictionary
+          for (int index = maxValue + 1; index <= currentDictionarySize; index++) {
+            String value = reverseIncrementalCache.get(index);
+            String parsedValue = DataTypeUtil.normalizeColumnValueForItsDataType(value, dimension);
+            if (null != parsedValue) {
+              dictionaryWriter.write(parsedValue);
+              distinctValues.add(parsedValue);
+            }
           }
+          // clear incremental dictionary to avoid write to file again
+          reverseIncrementalCache.clear();
+          incrementalCache.clear();
+          currentDictionarySize = maxValue;
         }
       }
     } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index 456e885..6e0bc3f 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -52,7 +52,11 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer, D
             key.getTableUniqueName(), key.getColumnName());
     // initialize TableDictionaryGenerator first
     if (tableMap.get(key.getTableUniqueName()) == null) {
-      tableMap.put(key.getTableUniqueName(), new TableDictionaryGenerator(dimension));
+      synchronized (tableMap) {
+        if (tableMap.get(key.getTableUniqueName()) == null) {
+          tableMap.put(key.getTableUniqueName(), new TableDictionaryGenerator(dimension));
+        }
+      }
     } else {
       tableMap.get(key.getTableUniqueName()).updateGenerator(dimension);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
index add7811..56ed7b9 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGenerator.java
@@ -115,7 +115,14 @@ public class TableDictionaryGenerator
   }
 
   public void updateGenerator(CarbonDimension dimension) {
-    columnMap.put(dimension.getColumnId(),
-            new IncrementalColumnDictionaryGenerator(dimension, 1));
+    // reuse dictionary generator
+    if (null == columnMap.get(dimension.getColumnId())) {
+      synchronized (columnMap) {
+        if (null == columnMap.get(dimension.getColumnId())) {
+          columnMap.put(dimension.getColumnId(),
+              new IncrementalColumnDictionaryGenerator(dimension, 1));
+        }
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
index c96a7d4..0cb47c4 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/generator/TableDictionaryGeneratorTest.java
@@ -65,8 +65,8 @@ public class TableDictionaryGeneratorTest {
     empDimension = new CarbonDimension(empColumnSchema, 0, 0, 0, 0, 0);
 
     ageColumnSchema = new ColumnSchema();
-    ageColumnSchema.setColumnName("empNameCol");
-    ageColumnSchema.setColumnUniqueId("empNameCol");
+    ageColumnSchema.setColumnName("ageNameCol");
+    ageColumnSchema.setColumnUniqueId("ageNameCol");
     ageColumnSchema.setDimensionColumn(true);
     ageColumnSchema.setEncodingList(Arrays.asList(Encoding.DICTIONARY));
     ageDimension = new CarbonDimension(ageColumnSchema, 0, 0, 0, 0, 0);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b4cf704/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index b1f4ba4..3adc0ed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -140,6 +140,67 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
   }
 
+  test("single pass data loading for partition table: hash partition") {
+    sql(
+      """
+        | CREATE TABLE hashTableSinglePass (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE hashTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+    validateDataFiles("default_hashTableSinglePass", "0", Seq(0, 1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTableSinglePass order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("single pass data loading for partition table: range partition") {
+    sql(
+      """
+        | CREATE TABLE rangeTableSinglePass (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        |  'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+    validateDataFiles("default_rangeTableSinglePass", "0", Seq(0, 1, 3, 4))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableSinglePass order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("single pass data loading for partition table: list partition") {
+    sql(
+      """
+        | CREATE TABLE listTableSinglePass (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        |  'LIST_INFO'='0, 1, (2, 3)')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTableSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+    validateDataFiles("default_listTableSinglePass", "0", Seq(1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableSinglePass order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+  }
+
   test("Insert into for partition table: hash partition") {
     sql(
       """
@@ -220,6 +281,25 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
   }
 
+  test("multiple single pass data loading for partition table") {
+    sql(
+      """
+        | CREATE TABLE multiLoadsSinglePass (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoadsSinglePass OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='TRUE')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiLoadsSinglePass order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
   test("multiple insertInto for partition table") {
     sql(
       """
@@ -309,6 +389,10 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     sql("drop table if exists loadAndInsert")
     sql("drop table if exists listTableUpper")
     sql("drop table if exists badrecordsPartition")
+    sql("drop table if exists hashTableSinglePass")
+    sql("drop table if exists rangeTableSinglePass")
+    sql("drop table if exists listTableSinglePass")
+    sql("drop table if exists multiLoadsSinglePass")
   }
 
 }