You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/08/07 13:09:49 UTC
[25/50] [abbrv] carbondata git commit: [CARBONDATA-2806] Delete
delete delta files upon clean files for flat folder
[CARBONDATA-2806] Delete delete delta files upon clean files for flat folder
Problem:
Delete delta files are not removed after clean files operation.
Solution:
Get the delta files using Segment Status Manager and remove them during clean operation.
This closes #2587
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/af984101
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/af984101
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/af984101
Branch: refs/heads/external-format
Commit: af984101ebcd55f71e370e52e837157b45c529dd
Parents: a302cd1
Author: ravipesala <ra...@gmail.com>
Authored: Mon Jul 30 20:30:58 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Aug 1 22:15:48 2018 +0800
----------------------------------------------------------------------
.../core/metadata/SegmentFileStore.java | 18 ++++++----
.../carbondata/core/util/DeleteLoadFolders.java | 35 +++++++++++++-------
.../core/util/path/CarbonTablePath.java | 8 +++++
.../FlatFolderTableLoadingTestCase.scala | 25 ++++++++++++++
4 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 67e58d1..111e444 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -56,6 +56,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.CarbonUtil;
import org.apache.carbondata.core.util.DataFileFooterConverter;
import org.apache.carbondata.core.util.path.CarbonTablePath;
@@ -793,25 +794,30 @@ public class SegmentFileStore {
/**
* Deletes the segment file and its physical files like partition folders from disk
* @param tablePath
- * @param segmentFile
+ * @param segment
* @param partitionSpecs
* @throws IOException
*/
- public static void deleteSegment(String tablePath, String segmentFile,
- List<PartitionSpec> partitionSpecs) throws IOException {
- SegmentFileStore fileStore = new SegmentFileStore(tablePath, segmentFile);
+ public static void deleteSegment(String tablePath, Segment segment,
+ List<PartitionSpec> partitionSpecs,
+ SegmentUpdateStatusManager updateStatusManager) throws Exception {
+ SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true);
Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
FileFactory.deleteFile(entry.getKey(), FileFactory.getFileType(entry.getKey()));
for (String file : entry.getValue()) {
+ String[] deltaFilePaths =
+ updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo());
+ for (String deltaFilePath : deltaFilePaths) {
+ FileFactory.deleteFile(deltaFilePath, FileFactory.getFileType(deltaFilePath));
+ }
FileFactory.deleteFile(file, FileFactory.getFileType(file));
}
}
deletePhysicalPartition(partitionSpecs, indexFilesMap, indexOrMergeFiles, tablePath);
String segmentFilePath =
- CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
- + segmentFile;
+ CarbonTablePath.getSegmentFilePath(tablePath, segment.getSegmentFileName());
// Deletes the physical segment file
FileFactory.deleteFile(segmentFilePath, FileFactory.getFileType(segmentFilePath));
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index 0433ba4..a65294e 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -40,6 +40,7 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
import org.apache.carbondata.core.statusmanager.SegmentStatus;
import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
import org.apache.carbondata.core.util.path.CarbonTablePath;
public final class DeleteLoadFolders {
@@ -75,21 +76,29 @@ public final class DeleteLoadFolders {
absoluteTableIdentifier,
currentDetails,
isForceDelete,
- specs);
+ specs,
+ currentDetails);
if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
physicalFactAndMeasureMetadataDeletion(
absoluteTableIdentifier,
newAddedLoadHistoryList,
isForceDelete,
- specs);
+ specs,
+ currentDetails);
}
}
- public static void physicalFactAndMeasureMetadataDeletion(
- AbsoluteTableIdentifier absoluteTableIdentifier,
- LoadMetadataDetails[] loadDetails,
- boolean isForceDelete,
- List<PartitionSpec> specs) {
+ /**
+ * Delete the invalid data physically from table.
+ * @param absoluteTableIdentifier table identifier
+ * @param loadDetails Load details which need clean up
+ * @param isForceDelete is Force delete requested by user
+ * @param specs Partition specs
+ * @param currLoadDetails Current table status load details which are required for update manager.
+ */
+ private static void physicalFactAndMeasureMetadataDeletion(
+ AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails,
+ boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) {
CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable(
absoluteTableIdentifier);
List<TableDataMap> indexDataMaps = new ArrayList<>();
@@ -104,14 +113,16 @@ public final class DeleteLoadFolders {
"Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.",
absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName()));
}
-
+ SegmentUpdateStatusManager updateStatusManager =
+ new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
for (final LoadMetadataDetails oneLoad : loadDetails) {
if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
try {
if (oneLoad.getSegmentFile() != null) {
- SegmentFileStore
- .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
- specs);
+ SegmentFileStore.deleteSegment(
+ absoluteTableIdentifier.getTablePath(),
+ new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()),
+ specs, updateStatusManager);
} else {
String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
boolean status = false;
@@ -161,7 +172,7 @@ public final class DeleteLoadFolders {
segments.add(new Segment(oneLoad.getLoadName()));
dataMap.deleteDatamapData(segments);
}
- } catch (IOException e) {
+ } catch (Exception e) {
LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 275d3d6..6493e34 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -694,6 +694,14 @@ public class CarbonTablePath {
}
/**
+ * Get the segment file path of table
+ */
+ public static String getSegmentFilePath(String tablePath, String segmentFileName) {
+ return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + "segments"
+ + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName;
+ }
+
+ /**
* Get the lock files directory
*/
public static String getLockFilesDirPath(String tablePath) {
http://git-wip-us.apache.org/repos/asf/carbondata/blob/af984101/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
index 9a60978..68f8ca7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/flatfolder/FlatFolderTableLoadingTestCase.scala
@@ -127,6 +127,31 @@ class FlatFolderTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {
sql("drop table if exists t1")
}
+ test("merge index flat folder and delete delta issue") {
+ sql("drop table if exists flatfolder_delete")
+ sql(
+ """
+ | CREATE TABLE flatfolder_delete (empname String, designation String, doj Timestamp,
+ | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+ | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+ | utilization int,salary int,empno int)
+ | STORED BY 'org.apache.carbondata.format' tblproperties('flat_folder'='true')
+ """.stripMargin)
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE flatfolder_delete OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+ val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "flatfolder_delete")
+ sql(s"""delete from flatfolder_delete where empname='anandh'""")
+ assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
+ sql("Alter table flatfolder_delete compact 'minor'")
+ assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 4)
+ sql("clean files for table flatfolder_delete")
+ assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)).length == 1)
+ assert(FileFactory.getCarbonFile(carbonTable.getTablePath).listFiles().filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)).length == 0)
+
+ }
+
override def afterAll = {
CarbonProperties.getInstance()
.addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,