You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/10/23 07:33:49 UTC

[carbondata] branch master updated: [CARBONDATA-4032] Fix drop partition command clean data issue

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e7e17c5  [CARBONDATA-4032] Fix drop partition command clean data issue
e7e17c5 is described below

commit e7e17c5bf679318d71ce17a119506ba74072b833
Author: haomarch <ma...@126.com>
AuthorDate: Wed Oct 14 01:20:14 2020 +0800

    [CARBONDATA-4032] Fix drop partition command clean data issue
    
    Why is this PR needed?
    1. CREATE TABLE droppartition (id STRING, sales STRING) PARTITIONED BY (dtm STRING)STORED AS carbondata
    2.
    insert into droppartition values ('01', '0', '20200907'),('03', '0', '20200908'),
    insert overwrite table droppartition partition (dtm=20200908) select * from droppartition where dtm = 20200907;
    insert overwrite table droppartition partition (dtm=20200909) select * from droppartition where dtm = 20200907;
    
    3. alter table droppartition drop partition (dtm=20200909)
    
    the dirctionary "20200908" was deleted.
    
    The rootcause: drop partition will clean stale data, but delete the parent dirctionary contains the stale data, leading to normal data loss.
    
    What changes were proposed in this PR?
    Fix this issue by limit to clean stale data, not the whole dirctionary
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    YES
    
    This closes #3982
---
 .../carbondata/core/metadata/SegmentFileStore.java | 48 +++++++++++++++++++---
 .../CarbonAlterTableDropHivePartitionCommand.scala |  5 ++-
 .../src/test/resources/IUD/updateinpartition.csv   | 21 ++++++++++
 .../StandardPartitionTableDropTestCase.scala       | 33 +++++++++++++++
 4 files changed, 101 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 52939eb..612308b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -28,6 +28,7 @@ import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1025,17 +1026,19 @@ public class SegmentFileStore {
   }
 
   /**
-   * Clean up invalid data after drop partition in all segments of table
+   * Clean up invalid data after drop partition in partial segments of table
    *
-   * @param table
+   * @param table the table need to cleaned stale data
+   * @param details the segments need to cleaned stale data
+   * @param partitionSpecs the partitions need to cleaned stale data
    * @param forceDelete Whether it should be deleted force or check the time for an hour creation
    *                    to delete data.
    * @throws IOException
    */
-  public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+  public static void cleanSegments(CarbonTable table,
+      LoadMetadataDetails[] details,
+      List<PartitionSpec> partitionSpecs,
       boolean forceDelete) throws IOException {
-
-    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
     // scan through each segment.
     for (LoadMetadataDetails segment : details) {
       // if this segment is valid then only we will go for deletion of related
@@ -1098,6 +1101,41 @@ public class SegmentFileStore {
   }
 
   /**
+   * Clean up invalid data after drop partition in all segments of table
+   *
+   * @param table the table need to clean stale data
+   * @param partitionSpecs the partitionSpecs need to clean
+   * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+   *                    to delete data.
+   * @throws IOException
+   */
+  public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitionSpecs,
+      boolean forceDelete) throws IOException {
+    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
+    cleanSegments(table, details, partitionSpecs, forceDelete);
+  }
+
+  /**
+   * Clean up invalid data after drop partition in partial segments of table
+   *
+   * @param table the table need to clean stale data
+   * @param segments the segments need to clean stale data
+   * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+   *                    to delete data.
+   * @throws IOException
+   */
+  public static void cleanSegments(CarbonTable table,
+      Set<String> segments,
+      List<PartitionSpec> partitionSpecs,
+      boolean forceDelete) throws IOException {
+    LoadMetadataDetails[] details = Arrays
+        .stream(SegmentStatusManager.readLoadMetadata(table.getMetadataPath()))
+        .filter(detail -> segments.contains(detail.getLoadName()))
+        .toArray(LoadMetadataDetails[]::new);
+    cleanSegments(table, details, partitionSpecs, forceDelete);
+  }
+
+  /**
    * Deletes the segment file and its physical files like partition folders from disk
    * @param tablePath
    * @param segment
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 266aceb..9a9afc3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -147,6 +147,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     var locks = List.empty[ICarbonLock]
     val uniqueId = System.currentTimeMillis().toString
+    val tobeCleanSegs = new util.HashSet[String]
     try {
       locks = AlterTableUtil.validateTableAndAcquireLock(
         table.getDatabaseName,
@@ -181,9 +182,11 @@ case class CarbonAlterTableDropHivePartitionCommand(
       OperationListenerBus.getInstance().fireEvent(postStatusEvent, operationContext)
 
       IndexStoreManager.getInstance().clearIndex(table.getAbsoluteTableIdentifier)
+      tobeCleanSegs.addAll(tobeUpdatedSegs)
+      tobeCleanSegs.addAll(tobeDeletedSegs)
     } finally {
       AlterTableUtil.releaseLocks(locks)
-      SegmentFileStore.cleanSegments(table, null, true)
+      SegmentFileStore.cleanSegments(table, tobeCleanSegs, null, true)
     }
     Seq.empty[Row]
   }
diff --git a/integration/spark/src/test/resources/IUD/updateinpartition.csv b/integration/spark/src/test/resources/IUD/updateinpartition.csv
new file mode 100644
index 0000000..30aa14c
--- /dev/null
+++ b/integration/spark/src/test/resources/IUD/updateinpartition.csv
@@ -0,0 +1,21 @@
+id,sales,dtm
+001,0,20200907
+002,0,20200907
+003,0,20200907
+004,0,20200907
+005,0,20200907
+006,0,20200907
+007,0,20200907
+008,0,20200907
+009,0,20200907
+010,0,20200907
+011,0,20200908
+012,0,20200908
+013,0,20200908
+014,0,20200908
+015,0,20200908
+016,0,20200908
+017,0,20200908
+018,0,20200908
+019,0,20200908
+020,0,20200908
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 1fd92fc..58075a6 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -159,6 +159,38 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
 
   }
 
+  test("dropping static partition after inserting overwrite partition") {
+    sql("""drop table if exists droppartition""")
+    sql(
+      """CREATE TABLE droppartition (id STRING, sales STRING)
+        | PARTITIONED BY (dtm STRING)
+        | STORED AS carbondata""".stripMargin)
+    sql(
+      s"""load data local inpath '$resourcesPath/IUD/updateinpartition.csv'
+         | into table droppartition""".stripMargin)
+    // insert overwrite an existing partition
+    sql(
+      """insert overwrite table droppartition
+        | partition (dtm=20200908)
+        | select * from droppartition
+        | where dtm = 20200907""".stripMargin)
+    // insert overwrite an non-existing partition
+    sql(
+      """insert overwrite table droppartition
+        | partition (dtm=20200909)
+        | select * from droppartition
+        | where dtm = 20200907""".stripMargin)
+
+    // make sure drop one partition won't effect other partitions
+    sql("""alter table droppartition drop partition (dtm=20200909)""")
+    checkAnswer(
+      sql(s"""select count(*),dtm from droppartition group by dtm"""),
+      Seq(Row(10, "20200907"), Row(10, "20200908")))
+    sql("""alter table droppartition drop partition (dtm=20200907)""")
+    checkAnswer(
+      sql(s"""select count(*),dtm from droppartition group by dtm"""),
+      Seq(Row(10, "20200908")))
+  }
 
   test("dropping all partition on table and do compaction") {
     sql(
@@ -229,6 +261,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
     sql("drop table if exists staticpartition")
     sql("drop table if exists partitionallcompaction")
     sql("drop table if exists partitionone1")
+    sql("drop table if exists droppartition")
   }
   // scalastyle:on lineLength
 }