You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/07 13:09:49 UTC

[25/50] [abbrv] carbondata git commit: [CARBONDATA-2806] Delete delete delta files upon clean files for flat folder

[CARBONDATA-2806] Delete delete delta files upon clean files for flat folder

Problem:
Delete delta files are not removed after clean files operation.
Solution:
Get the delta files using Segment Status Manager and remove them during clean operation.

This closes #2587


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/af984101
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/af984101
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/af984101

Branch: refs/heads/external-format
Commit: af984101ebcd55f71e370e52e837157b45c529dd
Parents: a302cd1
Author: ravipesala <ra...@gmail.com>
Authored: Mon Jul 30 20:30:58 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Aug 1 22:15:48 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/SegmentFileStore.java         | 18 ++++++----
 .../carbondata/core/util/DeleteLoadFolders.java | 35 +++++++++++++-------
 .../core/util/path/CarbonTablePath.java         |  8 +++++
 .../FlatFolderTableLoadingTestCase.scala        | 25 ++++++++++++++
 4 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 67e58d1..111e444 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -793,25 +794,30 @@ public class SegmentFileStore {
   /**
    * Deletes the segment file and its physical files like partition folders from disk
    * @param tablePath
-   * @param segmentFile
+   * @param segment
    * @param partitionSpecs
    * @throws IOException
    */
-  public static void deleteSegment(String tablePath, String segmentFile,
-      List<PartitionSpec> partitionSpecs) throws IOException {
-    SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+  public static void deleteSegment(String tablePath, Segment segment,
+      List<PartitionSpec> partitionSpecs,
+      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+    SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
     List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
       FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
       for (String file : entry.getValue()) {
+        String[] deltaFilePaths =
+            updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo());
+        for (String deltaFilePath : deltaFilePaths) {
+          FileFactory.deleteFile(deltaFilePath, FileFactory.getFileType(deltaFilePath));
+        }
         FileFactory.deleteFile(file, FileFactory.getFileType(file));
       }
     }
     deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, tablePath);
     String segmentFilePath =
-        CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
-            + segmentFile;
+        CarbonTablePath.getSegmentFilePath(tablePath, segment.getSegmentFileName());
     // Deletes the physical segment file
     FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 0433ba4..a65294e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 public final class DeleteLoadFolders {
@@ -75,21 +76,29 @@ public final class DeleteLoadFolders {
         absoluteTableIdentifier,
         currentDetails,
         isForceDelete,
-        specs);
+        specs,
+        currentDetails);
     if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
       physicalFactAndMeasureMetadataDeletion(
           absoluteTableIdentifier,
           newAddedLoadHistoryList,
           isForceDelete,
-          specs);
+          specs,
+          currentDetails);
     }
   }
 
-  public static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      LoadMetadataDetails[] loadDetails,
-      boolean isForceDelete,
-      List<PartitionSpec> specs) {
+  /**
+   * Delete the invalid data physically from table.
+   * @param absoluteTableIdentifier table identifier
+   * @param loadDetails Load details which need clean up
+   * @param isForceDelete is Force delete requested by user
+   * @param specs Partition specs
+   * @param currLoadDetails Current table status load details which are required for update manager.
+   */
+  private static void physicalFactAndMeasureMetadataDeletion(
+      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails,
+      boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) {
     CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable(
         absoluteTableIdentifier);
     List<TableDataMap> indexDataMaps = new ArrayList<>();
@@ -104,14 +113,16 @@ public final class DeleteLoadFolders {
           "Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.",
           absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName()));
     }
-
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
     for (final LoadMetadataDetails oneLoad : loadDetails) {
       if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
         try {
           if (oneLoad.getSegmentFile() != null) {
-            SegmentFileStore
-                .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
-                    specs);
+            SegmentFileStore.deleteSegment(
+                absoluteTableIdentifier.getTablePath(),
+                new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()),
+                specs, updateStatusManager);
           } else {
             String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
             boolean status = false;
@@ -161,7 +172,7 @@ public final class DeleteLoadFolders {
             segments.add(new Segment(oneLoad.getLoadName()));
             dataMap.deleteDatamapData(segments);
           }
-        } catch (IOException e) {
+        } catch (Exception e) {
           LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 275d3d6..6493e34 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -694,6 +694,14 @@ public class CarbonTablePath {
   }
 
   /**
+   * Get the segment file path of table
+   */
+  public static String getSegmentFilePath(String tablePath, String segmentFileName) {
+    return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments"
+        + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
+  }
+
+  /**
    * Get the lock files directory
    */
   public static String getLockFilesDirPath(String tablePath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 9a60978..68f8ca7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -127,6 +127,31 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists t1")
   }
 
+  test("merge index flat folder and delete delta issue") {
+    sql("drop table if exists flatfolder_delete")
+    sql(
+      """
+        | CREATE TABLE flatfolder_delete (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,empno int)
+        | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "flatfolder_delete")
+    sql(s"""delete from flatfolder_delete where empname='anandh'""")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
+    sql("Alter table flatfolder_delete compact 'minor'")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
+    sql("clean files for table flatfolder_delete")
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
+    assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0)
+
+  }
+
   override def afterAll = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,