You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:39 UTC

[21/50] [abbrv] carbondata git commit: [CARBONDATA-2021]fix clean up issue when update operation is abprutly stopped

[CARBONDATA-2021]fix clean up issue when update operation is abprutly stopped

when delete is success and update is failed while writing status file then a stale carbon data file is created.
so removing that file on clean up . and also not considering that one during query.

This closes #1793


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b2139cab
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b2139cab
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b2139cab

Branch: refs/heads/fgdatamap
Commit: b2139cabe8cdeb7c241e30a525d754578cfa5ec6
Parents: d90280a
Author: akashrn5 <ak...@gmail.com>
Authored: Wed Jan 10 20:29:43 2018 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Jan 31 19:23:55 2018 +0800

----------------------------------------------------------------------
 .../core/mutate/CarbonUpdateUtil.java           | 64 ++++++++++++++++++--
 .../SegmentUpdateStatusManager.java             | 27 +++++++--
 .../apache/carbondata/core/util/CarbonUtil.java | 10 +++
 .../processing/util/CarbonLoaderUtil.java       |  6 +-
 4 files changed, 93 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index f4566ac..0e4eec7 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -427,6 +427,10 @@ public class CarbonUpdateUtil {
 
     String validUpdateStatusFile = "";
 
+    boolean isAbortedFile = true;
+
+    boolean isInvalidFile = false;
+
     // scan through each segment.
 
     for (LoadMetadataDetails segment : details) {
@@ -450,10 +454,14 @@ public class CarbonUpdateUtil {
         SegmentUpdateStatusManager updateStatusManager =
                 new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
 
+        // deleting of the aborted file scenario.
+        deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager);
+
         // get Invalid update  delta files.
         CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles);
+            .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
+                isInvalidFile);
 
         // now for each invalid delta file need to check the query execution time out
         // and then delete.
@@ -465,8 +473,9 @@ public class CarbonUpdateUtil {
 
         // do the same for the index files.
         CarbonFile[] invalidIndexFiles = updateStatusManager
-                .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                        CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles);
+            .getUpdateDeltaFilesList(segment.getLoadName(), false,
+                CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
+                isInvalidFile);
 
         // now for each invalid index file need to check the query execution time out
         // and then delete.
@@ -492,11 +501,20 @@ public class CarbonUpdateUtil {
             continue;
           }
 
+          // aborted scenario.
+          invalidDeleteDeltaFiles = updateStatusManager
+              .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
+                  allSegmentFiles, isAbortedFile);
+          for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+            boolean doForceDelete = true;
+            compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+          }
+
           // case 1
           if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
             completeListOfDeleteDeltaFiles = updateStatusManager
                     .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true,
-                            allSegmentFiles);
+                            allSegmentFiles, isInvalidFile);
             for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
 
               compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -518,7 +536,7 @@ public class CarbonUpdateUtil {
           } else {
             invalidDeleteDeltaFiles = updateStatusManager
                     .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false,
-                            allSegmentFiles);
+                            allSegmentFiles, isInvalidFile);
             for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
 
               compareTimestampsAndDelete(invalidFile, forceDelete, false);
@@ -559,6 +577,40 @@ public class CarbonUpdateUtil {
   }
 
   /**
+   * This function deletes all the stale carbondata files during clean up before update operation
+   * one scenario is if update operation is ubruptly stopped before updation of table status then
+   * the carbondata file created during update operation is stale file and it will be deleted in
+   * this function in next update operation
+   * @param segment
+   * @param allSegmentFiles
+   * @param updateStatusManager
+   */
+  private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
+      CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
+    boolean doForceDelete = true;
+    boolean isAbortedFile = true;
+    CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
+        .getUpdateDeltaFilesList(segment.getLoadName(), false,
+            CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
+            isAbortedFile);
+    // now for each invalid delta file need to check the query execution time out
+    // and then delete.
+    for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
+      compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+    }
+    // do the same for the index files.
+    CarbonFile[] invalidIndexFiles = updateStatusManager
+        .getUpdateDeltaFilesList(segment.getLoadName(), false,
+            CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
+            isAbortedFile);
+    // now for each invalid index file need to check the query execution time out
+    // and then delete.
+    for (CarbonFile invalidFile : invalidIndexFiles) {
+      compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+    }
+  }
+
+  /**
    * This will tell whether the max query timeout has been expired or not.
    * @param fileTimestamp
    * @return

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index df7eedd..e0e7b70 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -469,7 +469,7 @@ public class SegmentUpdateStatusManager {
    */
   public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles,
       final String fileExtension, final boolean excludeOriginalFact,
-      CarbonFile[] allFilesOfSegment) {
+      CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {
 
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
@@ -528,7 +528,12 @@ public class SegmentUpdateStatusManager {
           }
         } else {
           // invalid cases.
-          if (Long.compare(timestamp, startTimeStampFinal) < 0) {
+          if (isAbortedFile) {
+            if (Long.compare(timestamp, endTimeStampFinal) > 0) {
+              listOfCarbonFiles.add(eachFile);
+            }
+          } else if (Long.compare(timestamp, startTimeStampFinal) < 0
+              || Long.compare(timestamp, endTimeStampFinal) > 0) {
             listOfCarbonFiles.add(eachFile);
           }
         }
@@ -934,11 +939,14 @@ public class SegmentUpdateStatusManager {
    */
   public CarbonFile[] getDeleteDeltaInvalidFilesList(final String segmentId,
       final SegmentUpdateDetails block, final boolean needCompleteList,
-      CarbonFile[] allSegmentFiles) {
+      CarbonFile[] allSegmentFiles, boolean isAbortedFile) {
 
     final long deltaStartTimestamp =
         getStartTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
 
+    final long deltaEndTimestamp =
+        getEndTimeOfDeltaFile(CarbonCommonConstants.DELETE_DELTA_FILE_EXT, block);
+
     List<CarbonFile> files =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
@@ -956,9 +964,16 @@ public class SegmentUpdateStatusManager {
         long timestamp = CarbonUpdateUtil.getTimeStampAsLong(
             CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(fileName));
 
-        if (block.getBlockName().equalsIgnoreCase(blkName) && (
-            Long.compare(timestamp, deltaStartTimestamp) < 0)) {
-          files.add(eachFile);
+        if (block.getBlockName().equalsIgnoreCase(blkName)) {
+
+          if (isAbortedFile) {
+            if (Long.compare(timestamp, deltaEndTimestamp) > 0) {
+              files.add(eachFile);
+            }
+          } else if (Long.compare(timestamp, deltaStartTimestamp) < 0
+              || Long.compare(timestamp, deltaEndTimestamp) > 0) {
+            files.add(eachFile);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5d7a09f..600b1c9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1701,6 +1701,16 @@ public final class CarbonUtil {
               && blockTimeStamp < invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()))) {
         return true;
       }
+      // aborted files case.
+      if (invalidBlockVOForSegmentId.getLatestUpdateTimestamp() != null
+          && blockTimeStamp > invalidBlockVOForSegmentId.getLatestUpdateTimestamp()) {
+        return true;
+      }
+      // for 1st time starttime stamp will be empty so need to consider fact time stamp.
+      if (null == invalidBlockVOForSegmentId.getUpdateDeltaStartTimestamp()
+          && blockTimeStamp > invalidBlockVOForSegmentId.getFactTimestamp()) {
+        return true;
+      }
     }
     return false;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b2139cab/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index fdc2cc3..12fc5c1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -375,8 +375,10 @@ public final class CarbonLoaderUtil {
     }
 
     // reading the start time of data load.
-    long loadStartTime = CarbonUpdateUtil.readCurrentTime();
-    model.setFactTimeStamp(loadStartTime);
+    if (model.getFactTimeStamp() == 0) {
+      long loadStartTime = CarbonUpdateUtil.readCurrentTime();
+      model.setFactTimeStamp(loadStartTime);
+    }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
     boolean entryAdded =