You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/12/21 11:04:11 UTC

[carbondata] branch master updated: [CARBONDATA-4092] Fix concurrent issues in delete segment API's and MV flow

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new ecebee5  [CARBONDATA-4092] Fix concurrent issues in delete segment API's and MV flow
ecebee5 is described below

commit ecebee58eb0e0e330dd758b1469cf01ef06b154c
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Fri Dec 18 17:07:36 2020 +0530

    [CARBONDATA-4092] Fix concurrent issues in delete segment API's and MV flow
    
    Why is this PR needed?
    They are multiple issues with the Delete segment API:
    
    Not using the latest loadmetadatadetails while writing to table status file, thus can remove table status entry of any concurrently loaded Insert In progress/success segment.
    The code reads the table status file 2 times
    When in concurrent queries, they both access checkAndReloadSchema for MV on all databases, 2 different queries try to create a file on same location, HDFS takes the lock for one and fails for another, thus failing the query
    
    What changes were proposed in this PR?
    Only reading the table status file once.
    Using the latest tablestatus to mark the segment Marked for delete, thus no concurrent issues will come
    Made touchMDT and checkAndReloadSchema methods syncronized, so that only instance can access it at one time.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #4059
---
 .../core/statusmanager/SegmentStatusManager.java   | 176 ++++++++++-----------
 .../apache/carbondata/core/view/MVProvider.java    |  12 +-
 .../apache/carbondata/trash/DataTrashManager.scala |  10 ++
 3 files changed, 105 insertions(+), 93 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index c062a02..4af4a55 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -427,6 +427,8 @@ public class SegmentStatusManager {
   public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
       List<String> loadIds, String tableFolderPath) throws Exception {
     CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonCleanFilesLock =
+        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK);
     ICarbonLock carbonDeleteSegmentLock =
         CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK);
     ICarbonLock carbonTableStatusLock =
@@ -437,46 +439,52 @@ public class SegmentStatusManager {
     try {
       if (carbonDeleteSegmentLock.lockWithRetries()) {
         LOG.info("Delete segment lock has been successfully acquired");
+        if (carbonCleanFilesLock.lockWithRetries()) {
+          LOG.info("Clean Files lock has been successfully acquired");
+          String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier
+              .getTablePath());
+          LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+          if (!FileFactory.isFileExist(dataLoadLocation)) {
+            // log error.
+            LOG.error("Load metadata file is not present.");
+            return loadIds;
+          }
+          // read existing metadata details in load metadata.
+          listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+          if (listOfLoadFolderDetailsArray.length != 0) {
+            updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
+            if (invalidLoadIds.isEmpty()) {
+              // All or None , if anything fails then don't write
+              if (carbonTableStatusLock.lockWithRetries()) {
+                LOG.info("Table status lock has been successfully acquired");
+                // To handle concurrency scenarios, always take latest metadata before writing
+                // into status file.
+                LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
+                writeLoadDetailsIntoFile(dataLoadLocation, updateLatestTableStatusDetails(
+                    listOfLoadFolderDetailsArray, latestLoadMetadataDetails).stream()
+                    .toArray(LoadMetadataDetails[]::new));
+              } else {
+                String errorMsg = "Delete segment by id is failed for " + tableDetails
+                    + ". Not able to acquire the table status lock due to other operation running "
+                    + "in the background.";
+                LOG.error(errorMsg);
+                throw new Exception(errorMsg + " Please try after some time.");
+              }
 
-        String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-        if (!FileFactory.isFileExist(dataLoadLocation)) {
-          // log error.
-          LOG.error("Load metadata file is not present.");
-          return loadIds;
-        }
-        // read existing metadata details in load metadata.
-        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
-        if (listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(identifier, loadIds, listOfLoadFolderDetailsArray, invalidLoadIds);
-          if (invalidLoadIds.isEmpty()) {
-            // All or None , if anything fails then don't write
-            if (carbonTableStatusLock.lockWithRetries()) {
-              LOG.info("Table status lock has been successfully acquired");
-              // To handle concurrency scenarios, always take latest metadata before writing
-              // into status file.
-              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
-              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
-                  latestLoadMetadataDetails);
-              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
-            }
-            else {
-              String errorMsg = "Delete segment by id is failed for " + tableDetails
-                  + ". Not able to acquire the table status lock due to other operation running "
-                  + "in the background.";
-              LOG.error(errorMsg);
-              throw new Exception(errorMsg + " Please try after some time.");
+            } else {
+              return invalidLoadIds;
             }
 
           } else {
-            return invalidLoadIds;
+            LOG.error("Delete segment by Id is failed. No matching segment id found.");
+            return loadIds;
           }
-
         } else {
-          LOG.error("Delete segment by Id is failed. No matching segment id found.");
-          return loadIds;
+          String errorMsg = "Delete segment by id is failed for " + tableDetails
+              + " as not able to acquire clean files lock.";
+          LOG.error(errorMsg);
+          throw new Exception(errorMsg + " Please try after some time.");
         }
-
       } else {
         String errorMsg = "Delete segment by id is failed for " + tableDetails
             + ". Not able to acquire the delete segment lock due to another delete "
@@ -490,6 +498,7 @@ public class SegmentStatusManager {
     } finally {
       CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
       CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
+      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK);
     }
 
     return invalidLoadIds;
@@ -505,6 +514,8 @@ public class SegmentStatusManager {
   public static List<String> updateDeletionStatus(AbsoluteTableIdentifier identifier,
       String loadDate, String tableFolderPath, Long loadStartTime) throws Exception {
     CarbonTableIdentifier carbonTableIdentifier = identifier.getCarbonTableIdentifier();
+    ICarbonLock carbonCleanFilesLock =
+        CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK);
     ICarbonLock carbonDeleteSegmentLock =
         CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.DELETE_SEGMENT_LOCK);
     ICarbonLock carbonTableStatusLock =
@@ -515,49 +526,55 @@ public class SegmentStatusManager {
     try {
       if (carbonDeleteSegmentLock.lockWithRetries()) {
         LOG.info("Delete segment lock has been successfully acquired");
-
-        String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath());
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
-
-        if (!FileFactory.isFileExist(dataLoadLocation)) {
-          // Table status file is not present, maybe table is empty, ignore this operation
-          LOG.warn("Trying to update table metadata file which is not present.");
-          return invalidLoadTimestamps;
-        }
-        // read existing metadata details in load metadata.
-        listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
-        if (listOfLoadFolderDetailsArray.length != 0) {
-          updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray,
-              invalidLoadTimestamps, loadStartTime);
-          if (invalidLoadTimestamps.isEmpty()) {
-            if (carbonTableStatusLock.lockWithRetries()) {
-              LOG.info("Table status lock has been successfully acquired.");
-              // To handle concurrency scenarios, always take latest metadata before writing
-              // into status file.
-              LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
-              updateLatestTableStatusDetails(listOfLoadFolderDetailsArray,
-                  latestLoadMetadataDetails);
-              writeLoadDetailsIntoFile(dataLoadLocation, listOfLoadFolderDetailsArray);
+        if (carbonCleanFilesLock.lockWithRetries()) {
+          LOG.info("Clean Files lock has been successfully acquired");
+          String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier
+              .getTablePath());
+          LoadMetadataDetails[] listOfLoadFolderDetailsArray = null;
+
+          if (!FileFactory.isFileExist(dataLoadLocation)) {
+            // Table status file is not present, maybe table is empty, ignore this operation
+            LOG.warn("Trying to update table metadata file which is not present.");
+            return invalidLoadTimestamps;
+          }
+          // read existing metadata details in load metadata.
+          listOfLoadFolderDetailsArray = readLoadMetadata(tableFolderPath);
+          if (listOfLoadFolderDetailsArray.length != 0) {
+            updateDeletionStatus(identifier, loadDate, listOfLoadFolderDetailsArray,
+                invalidLoadTimestamps, loadStartTime);
+            if (invalidLoadTimestamps.isEmpty()) {
+              if (carbonTableStatusLock.lockWithRetries()) {
+                LOG.info("Table status lock has been successfully acquired.");
+                // To handle concurrency scenarios, always take latest metadata before writing
+                // into status file.
+                LoadMetadataDetails[] latestLoadMetadataDetails = readLoadMetadata(tableFolderPath);
+                writeLoadDetailsIntoFile(dataLoadLocation, updateLatestTableStatusDetails(
+                    listOfLoadFolderDetailsArray, latestLoadMetadataDetails).stream()
+                    .toArray(LoadMetadataDetails[]::new));
+              } else {
+
+                String errorMsg = "Delete segment by date is failed for " + tableDetails
+                    + ". Not able to acquire the table status lock due to other operation running "
+                    + "in the background.";
+                LOG.error(errorMsg);
+                throw new Exception(errorMsg + " Please try after some time.");
+
+              }
+            } else {
+              return invalidLoadTimestamps;
             }
-            else {
-
-              String errorMsg = "Delete segment by date is failed for " + tableDetails
-                  + ". Not able to acquire the table status lock due to other operation running "
-                  + "in the background.";
-              LOG.error(errorMsg);
-              throw new Exception(errorMsg + " Please try after some time.");
 
-            }
           } else {
+            LOG.error("Delete segment by date is failed. No matching segment found.");
+            invalidLoadTimestamps.add(loadDate);
             return invalidLoadTimestamps;
           }
-
         } else {
-          LOG.error("Delete segment by date is failed. No matching segment found.");
-          invalidLoadTimestamps.add(loadDate);
-          return invalidLoadTimestamps;
+          String errorMsg = "Delete segment by id is failed for " + tableDetails
+              + " as not able to acquire clean files lock.";
+          LOG.error(errorMsg);
+          throw new Exception(errorMsg + " Please try after some time.");
         }
-
       } else {
         String errorMsg = "Delete segment by date is failed for " + tableDetails
             + ". Not able to acquire the delete segment lock due to another delete "
@@ -571,6 +588,7 @@ public class SegmentStatusManager {
     } finally {
       CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
       CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK);
+      CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK);
     }
 
     return invalidLoadTimestamps;
@@ -819,26 +837,6 @@ public class SegmentStatusManager {
     }
   }
 
-  /**
-   * This API will return the update status file name.
-   * @param segmentList
-   * @return
-   */
-  public String getUpdateStatusFileName(LoadMetadataDetails[] segmentList) {
-    if (segmentList.length == 0) {
-      return "";
-    }
-    else {
-      for (LoadMetadataDetails eachSeg : segmentList) {
-        // file name stored in 0th segment.
-        if (eachSeg.getLoadName().equalsIgnoreCase("0")) {
-          return eachSeg.getUpdateStatusFileName();
-        }
-      }
-    }
-    return "";
-  }
-
   public static class ValidAndInvalidSegmentsInfo {
     private final List<Segment> listOfValidSegments;
     private final List<Segment> listOfValidUpdatedSegments;
diff --git a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
index e21127f..9aa5cf3 100644
--- a/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/view/MVProvider.java
@@ -429,8 +429,9 @@ public class MVProvider {
         }
         this.schemas.add(viewSchema);
         CarbonUtil.closeStreams(dataOutputStream, brWriter);
-        checkAndReloadSchemas(viewManager, true);
-        touchMDTFile();
+        if (!checkAndReloadSchemas(viewManager, true)) {
+          touchMDTFile();
+        }
       }
     }
 
@@ -523,7 +524,7 @@ public class MVProvider {
       LOG.info(String.format("Materialized view %s schema is deleted", viewName));
     }
 
-    private void checkAndReloadSchemas(MVManager viewManager, boolean touchFile)
+    private synchronized boolean checkAndReloadSchemas(MVManager viewManager, boolean touchFile)
         throws IOException {
       if (FileFactory.isFileExist(this.schemaIndexFilePath)) {
         long lastModifiedTime =
@@ -531,16 +532,19 @@ public class MVProvider {
         if (this.lastModifiedTime != lastModifiedTime) {
           this.schemas = this.retrieveAllSchemasInternal(viewManager);
           touchMDTFile();
+          return true;
         }
       } else {
         this.schemas = this.retrieveAllSchemasInternal(viewManager);
         if (touchFile) {
           touchMDTFile();
+          return true;
         }
       }
+      return false;
     }
 
-    private void touchMDTFile() throws IOException {
+    private synchronized void touchMDTFile() throws IOException {
       if (!FileFactory.isFileExist(this.systemDirectory)) {
         FileFactory.createDirectoryAndSetPermission(this.systemDirectory,
             new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
index 75c4751..cadc43b 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
@@ -57,13 +57,20 @@ object DataTrashManager {
       throw new RuntimeException("Clean files with force operation not permitted by default")
     }
     var carbonCleanFilesLock: ICarbonLock = null
+    var carbonDeleteSegmentLock : ICarbonLock = null
     try {
       val errorMsg = "Clean files request is failed for " +
         s"${ carbonTable.getQualifiedName }" +
         ". Not able to acquire the clean files lock due to another clean files " +
         "operation is running in the background."
+      val deleteSegmentErrorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the delete segment lock due to another delete segment " +
+        "operation running in the background."
       carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      carbonDeleteSegmentLock = CarbonLockUtil.getLockObject(carbonTable
+        .getAbsoluteTableIdentifier, LockUsage.DELETE_SEGMENT_LOCK, deleteSegmentErrorMsg)
       // step 1: check and clean trash folder
       checkAndCleanTrashFolder(carbonTable, isForceDelete)
       // step 2: move stale segments which are not exists in metadata into .Trash
@@ -74,6 +81,9 @@ object DataTrashManager {
       if (carbonCleanFilesLock != null) {
         CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
       }
+      if (carbonDeleteSegmentLock != null) {
+        CarbonLockUtil.fileUnlock(carbonDeleteSegmentLock, LockUsage.DELETE_SEGMENT_LOCK)
+      }
     }
   }