You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by in...@apache.org on 2022/07/19 04:46:06 UTC

[carbondata] branch master updated: [CARBONDATA-4338] Moving dropped partition data to trash

This is an automated email from the ASF dual-hosted git repository.

indhumuthumurugesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 04b175620f [CARBONDATA-4338] Moving dropped partition data to trash
04b175620f is described below

commit 04b175620fedbb5555258a81081b2f691fa0ee5e
Author: Mahesh Raju Somalaraju <ma...@huawei.com>
AuthorDate: Mon Jun 6 16:52:28 2022 +0530

    [CARBONDATA-4338] Moving dropped partition data to trash
    
    Why is this PR needed?
    When drop partition operation is performed carbon data will
    modify only table status file and can not delete the actual
    partition folder which contains data and index files. As
    comply with hive behaviour carbon data also should delete
    the deleted partition folder in storage[hdfs/obs/etc..].
    Before deleting carbon data will keep copy in Trash folder.
    User can restore it by checking the partition name and time stamp.
    
    What changes were proposed in this PR?
    Moved the deleted partition folder files to trash folder
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4276
---
 .../core/constants/CarbonCommonConstants.java      |  10 ++
 .../carbondata/core/util/CleanFilesUtil.java       |   4 +-
 .../org/apache/carbondata/core/util/TrashUtil.java |  41 ++++++
 docs/configuration-parameters.md                   |   1 +
 docs/ddl-of-carbondata.md                          |   1 +
 docs/faq.md                                        |   7 +-
 .../CarbonAlterTableDropHivePartitionCommand.scala |  30 ++++-
 .../StandardPartitionTableDropTestCase.scala       | 146 ++++++++++++++++++++-
 8 files changed, 236 insertions(+), 4 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 663f7d21cb..023137e815 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -2886,5 +2886,15 @@ public final class CarbonCommonConstants {
 
   public static final String CARBON_ENABLE_MULTI_VERSION_TABLE_STATUS_DEFAULT = "false";
 
+  /**
+   * Enable this property to move the dropped partition data to trash on
+   * ALTER DROP PARTITION operation
+   * By default it is disabled if user want to move partition data to trash
+   * then enable this feature.
+   */
+  @CarbonProperty
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH =
+      "carbon.enable.partitiondata.trash";
 
+  public static final String CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT = "false";
 }
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
index bfa9b949fc..e395b7a622 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
@@ -206,10 +206,12 @@ public class CleanFilesUtil {
   /**
    * This method will delete all the empty partition folders starting from the table path
    */
-  private static void deleteEmptyPartitionFoldersRecursively(CarbonFile tablePath) {
+  public static void deleteEmptyPartitionFoldersRecursively(CarbonFile tablePath) {
     CarbonFile[] listOfFiles = tablePath.listFiles();
     if (listOfFiles.length == 0) {
       tablePath.delete();
+      // if parent file folder also empty then delete that too.
+      deleteEmptyPartitionFoldersRecursively(tablePath.getParentFile());
     } else {
       for (CarbonFile file: listOfFiles) {
         if (file.isDirectory() && file.getName().contains("=")) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
index 47d196b7d9..6e6131b541 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
@@ -268,4 +268,45 @@ public final class TrashUtil {
       timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR + CarbonTablePath
       .SEGMENT_PREFIX + segmentNumber;
   }
+
+  /**
+   * This will give the complete path of the trash folder with the timestamp and the partition name
+   *
+   * @param tablePath          absolute table path
+   * @param timeStampSubFolder the timestamp for the clean files operation
+   * @param partitionName      partition name for which files are moved to the trash folder
+   */
+  public static String getCompleteTrashFolderPathForPartition(String tablePath,
+      long timeStampSubFolder, String partitionName) {
+    return CarbonTablePath.getTrashFolderPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+        + timeStampSubFolder + CarbonCommonConstants.FILE_SEPARATOR
+        + partitionName;
+  }
+
+  /**
+   * The below method copies dropped partition files to the trash folder.
+   *
+   * @param filesToCopy              absolute path of the files to copy to the trash folder
+   * @param trashFolderWithTimestamp trashfolderpath with complete timestamp and segment number
+   */
+  public static void copyPartitionDataToTrash(String filesToCopy, String trashFolderWithTimestamp) {
+    try {
+      if (!FileFactory.isFileExist(trashFolderWithTimestamp)) {
+        FileFactory.mkdirs(trashFolderWithTimestamp);
+      }
+      // check if file exists before copying
+      if (FileFactory.isFileExist(filesToCopy)) {
+        CarbonFile folder = FileFactory.getCarbonFile(filesToCopy);
+        CarbonFile[] dataFiles = folder.listFiles();
+        for (CarbonFile carbonFile : dataFiles) {
+          copyFileToTrashFolder(carbonFile.getAbsolutePath(), trashFolderWithTimestamp);
+        }
+      } else {
+        LOGGER.warn("Folder not copied to trash as partition folder does not exist");
+      }
+    } catch (IOException e) {
+      // If file is already moved or not found then continue with other files
+      LOGGER.warn("Unable to copy file to trash folder as file not found.", e);
+    }
+  }
 }
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 39064d4ae9..38b4f11233 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -58,6 +58,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.lock.class | (none) | This specifies the implementation of ICarbonLock interface to be used for acquiring the locks in case of concurrent operations                                                                                                                                                                                                                                                                                                                                                [...]
 | carbon.data.file.version | V3 | This specifies carbondata file format version. Carbondata file format has evolved with time from V1 to V3 in terms of metadata storage and IO level pruning capabilities. You can find more details [here](https://carbondata.apache.org/file-structure-of-carbondata.html#carbondata-file-format).                                                                                                                                                                        [...]
 | spark.carbon.hive.schema.store | false | Carbondata currently supports 2 different types of metastores for storing schemas. This property specifies if Hive metastore is to be used for storing and retrieving table schemas                                                                                                                                                                                                                                                                               [...]
+| carbon.enable.partitiondata.trash | false | This property when enabled, will move the dropped partition data to trash on ALTER DROP PARTITION operation.
 
 ## Data Loading Configuration
 
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 3d04684c29..ab2781f579 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -1107,6 +1107,7 @@ Users can specify which columns to include and exclude for local dictionary gene
   ALTER TABLE locationTable DROP PARTITION (country = 'US');
   ```
 
+   **NOTE:** Enable [carbon.enable.partitiondata.trash](./configuration-parameters.md#system-configuration) to move dropped partition data to trash during alter table DROP PARTITION.
 #### Insert OVERWRITE
 
   This command allows you to insert or load overwrite on a specific partition.
diff --git a/docs/faq.md b/docs/faq.md
index cde3e9e4da..0e28abc448 100644
--- a/docs/faq.md
+++ b/docs/faq.md
@@ -31,6 +31,7 @@
 * [How to deal with the trailing task in query?](#How-to-deal-with-the-trailing-task-in-query)
 * [How to manage hybrid file format in carbondata table?](#How-to-manage-hybrid-file-format-in-carbondata-table)
 * [How to recover table status file if lost?](#How-to-recover-table-status-file-if-lost)
+* [Why deleted partition data still showing in file system?](#why-deleted-partition-data-still-showing-in-file-system)
 
 # TroubleShooting
 
@@ -481,4 +482,8 @@ TableStatusRecovery.main(args) --> args is of length two: 1. Database Name 2. Ta
 TableStatus Recovery tool cannot recover table status version files for the below two scenarios
 1. After compaction, if table status file is lost, cannot recover compacted commit transaction, as the lost version file only has merged load details.
 2. After Delete segment by Id/Date, if table status file is lost, cannot recover deleted segment commit transaction, as the lost version file only has the segment status as deleted.
-3. Table status recovery on materialized view table is not supported.
\ No newline at end of file
+3. Table status recovery on materialized view table is not supported.
+
+## Why deleted partition data still showing in file system
+By default, the dropped partition data will not be physically removed from the table store until the table is dropped. 
+Enable carbon.enable.partitiondata.trash property to move all the dropped partitions data to trash during alter table DROP PARTITION operation itself.
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index d7d317f4f3..8399a3aa1f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -31,13 +31,15 @@ import org.apache.spark.sql.parser.CarbonSparkSqlParserUtil
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.IndexStoreManager
-import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
 import org.apache.carbondata.events._
 import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
 
@@ -106,6 +108,32 @@ case class CarbonAlterTableDropHivePartitionCommand(
             ifExists,
             purge,
             retainData).run(sparkSession)
+          val isPartitionDataTrashEnabled = CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+              CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT).toBoolean
+          if (isPartitionDataTrashEnabled) {
+            // move  the partition files to trash folder which are dropped
+            val droppedPartitionNames = partitions.map { partition =>
+              partition.spec.map { specs => specs._1 + CarbonCommonConstants.EQUALS + specs._2 }
+            }
+            val timeStamp = System.currentTimeMillis()
+            droppedPartitionNames.zipWithIndex.foreach { partitionName =>
+              val droppedPartitionName = droppedPartitionNames(partitionName._2).mkString("/")
+              TrashUtil.copyPartitionDataToTrash(carbonPartitionsTobeDropped.get(partitionName._2),
+                TrashUtil.getCompleteTrashFolderPathForPartition(
+                  table.getTablePath,
+                  timeStamp,
+                  droppedPartitionName))
+            }
+            // Delete partition folder after copy to trash
+            carbonPartitionsTobeDropped.asScala.foreach(delPartition => {
+              val partitionPath = FileFactory.getCarbonFile(delPartition)
+              CarbonUtil.deleteFoldersAndFiles(partitionPath)
+            })
+            // Finally delete empty partition folders.
+            CleanFilesUtil.deleteEmptyPartitionFoldersRecursively(FileFactory
+              .getCarbonFile(table.getTablePath))
+          }
         }
       } catch {
         case e: Exception =>
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 0148d02140..ab80826859 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -17,18 +17,22 @@
 
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import java.io.File
 import java.nio.file.{Files, LinkOption, Paths}
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAll {
   // scalastyle:off lineLength
+  var count = 0
   override def beforeAll {
     dropTable
 
@@ -218,6 +222,146 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
       Seq(Row(0)))
   }
 
+  test("dropping partition with moving data to trash") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH, "true")
+    sql("drop table if exists dropPartition1")
+    sql(
+      """
+        | CREATE TABLE dropPartition1 (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(40)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='Learning')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(32)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='configManagement')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(28)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='network')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(16)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='protocol')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(8)))
+    sql(s"""ALTER TABLE dropPartition1 DROP PARTITION(deptname='security')""")
+    checkAnswer(sql(s"""select count (*) from dropPartition1"""), Seq(Row(0)))
+    val table = CarbonEnv.getCarbonTable(Option("default"), "dropPartition1")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+    val deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    val configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    val network = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=network")
+    }
+    assert(network.length == 0)
+    val protocol = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=protocol")
+    }
+    assert(protocol.length == 0)
+    val security = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=security")
+    }
+    assert(security.length == 0)
+    sql("drop table if exists dropPartition1")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  test("dropping partition with moving data to trash and count check") {
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      "true")
+    sql("drop table if exists dropPartition2")
+    sql(
+      """
+        | CREATE TABLE dropPartition2 (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE dropPartition2 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    val table = CarbonEnv.getCarbonTable(Option("default"), "dropPartition2")(sqlContext
+      .sparkSession)
+    val tablePath = table.getTablePath
+
+    // check partition folder before dropping the partition
+    var deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length > 0)
+    var configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length > 0)
+    // check the partitin folder after dropping the partition
+    sql(s"""ALTER TABLE dropPartition2 DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE dropPartition2 DROP PARTITION(deptname='configManagement')""")
+
+    deptname = FileFactory.getCarbonFile(tablePath).listFiles().filter {
+      file => file.getName.equalsIgnoreCase("deptname=Learning")
+    }
+    assert(deptname.length == 0)
+    configManagement = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("deptname=configManagement")
+    }
+    assert(configManagement.length == 0)
+    // check the file count at trash folder
+    val trashFolderPath = tablePath + CarbonCommonConstants.FILE_SEPARATOR +
+                          CarbonTablePath.TRASH_DIR
+    assert(FileFactory.isFileExist(trashFolderPath))
+    count = 0
+    val list = getFileCountInTrashFolder(trashFolderPath)
+    // carbondata files are added to the trash
+    assert(list > 0)
+    sql("drop table if exists dropPartition2")
+    CarbonProperties.getInstance().addProperty(
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH,
+      CarbonCommonConstants.CARBON_ENABLE_PARTITION_DATA_TRASH_DEFAULT)
+  }
+
+  def getFileCountInTrashFolder(dirPath: String) : Int = {
+    val fileName = new File(dirPath)
+    val files = fileName.listFiles()
+    if (files != null) {
+      files.foreach(file => {
+        if (file.isFile) {
+          count = count + 1
+        }
+        if (file.isDirectory()) {
+          getFileCountInTrashFolder(file.getAbsolutePath())
+        }
+      })
+    }
+    count
+  }
+
   test("test dropping on partition table for int partition column") {
     sql(
       """