You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:39 UTC
[21/50] [abbrv] carbondata git commit: [CARBONDATA-2021]fix clean up
issue when update operation is abprutly stopped
[CARBONDATA-2021]fix clean up issue when update operation is abprutly stopped
when delete is success and update is failed while writing status file then a stale carbon data file is created.
so removing that file on clean up . and also not considering that one during query.
This closes #1793
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b2139cab
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b2139cab
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b2139cab
Branch: refs/heads/fgdatamap
Commit: b2139cabe8cdeb7c241e30a525d754578cfa5ec6
Parents: d90280a
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Jan 10 20:29:43 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Jan 31 19:23:55 2018 +0800
----------------------------------------------------------------------
.../core/mutate/CarbonUpdateUtil.java | 64 ++++++++++++++++++--
.../SegmentUpdateStatusManager.java | 27 +++++++--
.../apache/carbondata/core/util/CarbonUtil.java | 10 +++
.../processing/util/CarbonLoaderUtil.java | 6 +-
4 files changed, 93 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index f4566ac..0e4eec7 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -427,6 +427,10 @@ public class CarbonUpdateUtil {
String validUpdateStatusFile = "";
+ boolean isAbortedFile = true;
+
+ boolean isInvalidFile = false;
+
// scan through each segment.
for (LoadMetadataDetails segment : details) {
@@ -450,10 +454,14 @@ public class CarbonUpdateUtil {
SegmentUpdateStatusManager updateStatusManager =
new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+ // deleting of the aborted file scenario.
+ deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager);
+
// get Invalid update delta files.
CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
- .getUpdateDeltaFilesList(segment.getLoadName(), false,
- CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
+ isInvalidFile);
// now for each invalid delta file need to check the query execution time out
// and then delete.
@@ -465,8 +473,9 @@ public class CarbonUpdateUtil {
// do the same for the index files.
CarbonFile[] invalidIndexFiles = updateStatusManager
- .getUpdateDeltaFilesList(segment.getLoadName(), false,
- CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
+ isInvalidFile);
// now for each invalid index file need to check the query execution time out
// and then delete.
@@ -492,11 +501,20 @@ public class CarbonUpdateUtil {
continue;
}
+ // aborted scenario.
+ invalidDeleteDeltaFiles = updateStatusManager
+ .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
+ allSegmentFiles, isAbortedFile);
+ for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+ boolean doForceDelete = true;
+ compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+ }
+
// case 1
if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
completeListOfDeleteDeltaFiles = updateStatusManager
.getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
- allSegmentFiles);
+ allSegmentFiles, isInvalidFile);
for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -518,7 +536,7 @@ public class CarbonUpdateUtil {
} else {
invalidDeleteDeltaFiles = updateStatusManager
.getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
- allSegmentFiles);
+ allSegmentFiles, isInvalidFile);
for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -559,6 +577,40 @@ public class CarbonUpdateUtil {
}
/**
+ * This function deletes all the stale carbondata files during clean up before update operation
+ * one scenario is if update operation is ubruptly stopped before updation of table status then
+ * the carbondata file created during update operation is stale file and it will be deleted in
+ * this function in next update operation
+ * @param segment
+ * @param allSegmentFiles
+ * @param updateStatusManager
+ */
+ private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
+ CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
+ boolean doForceDelete = true;
+ boolean isAbortedFile = true;
+ CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
+ isAbortedFile);
+ // now for each invalid delta file need to check the query execution time out
+ // and then delete.
+ for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
+ compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+ }
+ // do the same for the index files.
+ CarbonFile[] invalidIndexFiles = updateStatusManager
+ .getUpdateDeltaFilesList(segment.getLoadName(), false,
+ CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
+ isAbortedFile);
+ // now for each invalid index file need to check the query execution time out
+ // and then delete.
+ for (CarbonFile invalidFile : invalidIndexFiles) {
+ compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+ }
+ }
+
+ /**
* This will tell whether the max query timeout has been expired or not.
* @param fileTimestamp
* @return
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index df7eedd..e0e7b70 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -469,7 +469,7 @@ public class SegmentUpdateStatusManager {
*/
public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles,
final String fileExtension, final boolean excludeOriginalFact,
- CarbonFile[] allFilesOfSegment) {
+ CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {
CarbonTablePath carbonTablePath = CarbonStorePath
.getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
@@ -528,7 +528,12 @@ public class SegmentUpdateStatusManager {
}
} else {
// invalid cases.
- if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+ if (isAbortedFile) {
+ if (Long.compare(timestamp, endTimeStampFinal) > 0) {
+ listOfCarbonFiles.add(eachFile);
+ }
+ } else if (Long.compare(timestamp, startTimeStampFinal) < 0
+ || Long.compare(timestamp, endTimeStampFinal) > 0) {
listOfCarbonFiles.add(eachFile);
}
}
@@ -934,11 +939,14 @@ public class SegmentUpdateStatusManager {
*/
public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
final SegmentUpdateDetails block, final boolean needCompleteList,
- CarbonFile[] allSegmentFiles) {
+ CarbonFile[] allSegmentFiles, boolean isAbortedFile) {
final long deltaStartTimestamp =
getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+ final long deltaEndTimestamp =
+ getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+
List<CarbonFile> files =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -956,9 +964,16 @@ public class SegmentUpdateStatusManager {
long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));
- if (block.getBlockName().equalsIgnoreCase(blkName) && (
- Long.compare(timestamp, deltaStartTimestamp) < 0)) {
- files.add(eachFile);
+ if (block.getBlockName().equalsIgnoreCase(blkName)) {
+
+ if (isAbortedFile) {
+ if (Long.compare(timestamp, deltaEndTimestamp) > 0) {
+ files.add(eachFile);
+ }
+ } else if (Long.compare(timestamp, deltaStartTimestamp) < 0
+ || Long.compare(timestamp, deltaEndTimestamp) > 0) {
+ files.add(eachFile);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5d7a09f..600b1c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1701,6 +1701,16 @@ public final class CarbonUtil {
&& blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {
return true;
}
+ // aborted files case.
+ if (invalidBlockVOForSegmentId.getLatestUpdateTimestamp() != null
+ && blockTimeStamp > invalidBlockVOForSegmentId.getLatestUpdateTimestamp()) {
+ return true;
+ }
+ // for 1st time starttime stamp will be empty so need to consider fact time stamp.
+ if (null == invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()
+ && blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp()) {
+ return true;
+ }
}
return false;
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index fdc2cc3..12fc5c1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -375,8 +375,10 @@ public final class CarbonLoaderUtil {
}
// reading the start time of data load.
- long loadStartTime = CarbonUpdateUtil.readCurrentTime();
- model.setFactTimeStamp(loadStartTime);
+ if (model.getFactTimeStamp() == 0) {
+ long loadStartTime = CarbonUpdateUtil.readCurrentTime();
+ model.setFactTimeStamp(loadStartTime);
+ }
CarbonLoaderUtil
.populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
boolean entryAdded =