You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2018/05/08 12:09:39 UTC

carbondata git commit: [CARBONDATA-2426] Added fix for data mismatch after compaction on Partition with Pre-Aggregate tables

Repository: carbondata
Updated Branches:
  refs/heads/master 325eac2b4 -> 09feb9cc2


[CARBONDATA-2426] Added fix for data mismatch after compaction on Partition with Pre-Aggregate tables

Problem: Partition directory is getting deleted when one on the segment is marked for delete and
another segment is loaded in the same partition.
In case of aggregate table we only have partition specs for the specified segments therefore
when the deleted segment is scanned against locationMap for stale partitions one of the
valid partition is considered as stale as we dont get partitionSpecs for all the segments.

Solution: Delete the index file instead of the partition directory and then delete the directory if it is empty.

This closes #2259


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/09feb9cc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/09feb9cc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/09feb9cc

Branch: refs/heads/master
Commit: 09feb9cc2c80f648a5789923394dbdd41f2c8e67
Parents: 325eac2
Author: praveenmeenakshi56 <pr...@gmail.com>
Authored: Wed May 2 15:39:28 2018 +0530
Committer: kunal642 <ku...@gmail.com>
Committed: Tue May 8 17:38:07 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 17 +++++++-
 ...ndardPartitionWithPreaggregateTestCase.scala | 43 ++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 9326b1d..d72ded3 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -700,16 +700,31 @@ public class SegmentFileStore {
     FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
   }
 
+  /**
+   * If partition specs are available, then check the location map for any index file path which is
+   * not present in the partitionSpecs. If found then delete that index file.
+   * If the partition directory is empty, then delete the directory also.
+   * If partition specs are null, then directly delete parent directory in locationMap.
+   */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
       Map<String, List<String>> locationMap) throws IOException {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
-      Path location = new Path(entry.getKey()).getParent();
       if (partitionSpecs != null) {
+        Path location = new Path(entry.getKey());
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
           FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+          for (String carbonDataFile : entry.getValue()) {
+            FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(carbonDataFile));
+          }
+        }
+        CarbonFile path = FileFactory.getCarbonFile(location.getParent().toString());
+        if (path.listFiles().length == 0) {
+          FileFactory.deleteAllCarbonFilesOfDir(
+              FileFactory.getCarbonFile(location.getParent().toString()));
         }
       } else {
+        Path location = new Path(entry.getKey()).getParent();
         // delete the segment folder
         CarbonFile segmentPath = FileFactory.getCarbonFile(location.toString());
         if (null != segmentPath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/09feb9cc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
index 8a3ae3f..84c07c4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
@@ -525,6 +525,49 @@ class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAnd
     sql("drop table if exists updatetime_8")
   }
 
+  test("Test data updation after compaction on Partition with Pre-Aggregate tables") {
+    sql("drop table if exists partitionallcompaction")
+    sql(
+      "create table partitionallcompaction(empno int,empname String,designation String," +
+      "workgroupcategory int,workgroupcategoryname String,deptno int,projectjoindate timestamp," +
+      "projectenddate date,attendance int,utilization int,salary int) partitioned by (deptname " +
+      "String,doj timestamp,projectcode int) stored  by 'carbondata' tblproperties" +
+      "('sort_scope'='global_sort')")
+    sql(
+      "create datamap sensor_1 on table partitionallcompaction using 'preaggregate' as select " +
+      "sum(salary),doj, deptname,projectcode from partitionallcompaction group by doj," +
+      "deptname,projectcode")
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='Learning', doj, projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"') """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='configManagement', doj, projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='network', doj, projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='protocol', doj, projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE
+         |partitionallcompaction PARTITION(deptname='security', doj, projectcode) OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql("ALTER TABLE partitionallcompaction COMPACT 'MINOR'").collect()
+    checkAnswer(sql("select count(empno) from partitionallcompaction where empno=14"),
+      Seq(Row(5)))
+  }
+
   test("Test data updation in Aggregate query after compaction on Partitioned table with Pre-Aggregate table") {
     sql("drop table if exists updatetime_8")
     sql("create table updatetime_8" +