You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2019/05/29 17:27:53 UTC

[carbondata] branch master updated: [CARBONDATA-3394]Clean files command optimization

This is an automated email from the ASF dual-hosted git repository.

kumarvishal09 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 6817e77  [CARBONDATA-3394]Clean files command optimization
6817e77 is described below

commit 6817e77ad667dbb483c76812f0478044fc444c49
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Mon May 27 12:24:33 2019 +0530

    [CARBONDATA-3394]Clean files command optimization
    
    Problem
    Clean files is taking of lot of time to finish, even though there are no segments to delete
    Tested for 5000 segments, and clean files takes 15 minutes time to finish
    
    Root cause and Solution
    Lot of table status read operations are were happening during clean files
    lot of listing operations are happening, even though they are not required.
    
    Read and list operations are reduced to reduce overall time for clean files.
    After changes, for the same store, it takes 35 seconds in same 3 node cluste
    
    This closes #3227
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 160 +++++++++++----------
 .../core/statusmanager/SegmentStatusManager.java   |  37 +++--
 .../statusmanager/SegmentUpdateStatusManager.java  |  21 +--
 .../org/apache/carbondata/api/CarbonStore.scala    |   3 +-
 4 files changed, 105 insertions(+), 116 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index beaf1a0..736def6 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -466,94 +466,96 @@ public class CarbonUpdateUtil {
       if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
               || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
 
-        // take the list of files from this segment.
-        String segmentPath = CarbonTablePath.getSegmentPath(
-            table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
-        CarbonFile segDir =
-            FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-        CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-        // scan through the segment and find the carbondatafiles and index files.
-        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
-
-        boolean updateSegmentFile = false;
-        // deleting of the aborted file scenario.
-        if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) {
-          updateSegmentFile = true;
-        }
-
-        // get Invalid update  delta files.
-        CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-            .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
-                isInvalidFile);
-
-        // now for each invalid delta file need to check the query execution time out
-        // and then delete.
-        for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-          compareTimestampsAndDelete(invalidFile, forceDelete, false);
-        }
-        // do the same for the index files.
-        CarbonFile[] invalidIndexFiles = updateStatusManager
-            .getUpdateDeltaFilesList(segment.getLoadName(), false,
-                CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
-                isInvalidFile);
-
-        // now for each invalid index file need to check the query execution time out
-        // and then delete.
-
-        for (CarbonFile invalidFile : invalidIndexFiles) {
-          if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+        // when there is no update operations done on table, then no need to go ahead. So
+        // just check the update delta start timestamp and proceed if not empty
+        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()) {
+          // take the list of files from this segment.
+          String segmentPath = CarbonTablePath.getSegmentPath(
+              table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
+          CarbonFile segDir =
+              FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+          CarbonFile[] allSegmentFiles = segDir.listFiles();
+
+          // scan through the segment and find the carbondatafiles and index files.
+          SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
+
+          boolean updateSegmentFile = false;
+          // deleting of the aborted file scenario.
+          if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) {
             updateSegmentFile = true;
           }
-        }
-        // now handle all the delete delta files which needs to be deleted.
-        // there are 2 cases here .
-        // 1. if the block is marked as compacted then the corresponding delta files
-        //    can be deleted if query exec timeout is done.
-        // 2. if the block is in success state then also there can be delete
-        //    delta compaction happened and old files can be deleted.
-
-        SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-        for (SegmentUpdateDetails block : updateDetails) {
-          CarbonFile[] completeListOfDeleteDeltaFiles;
-          CarbonFile[] invalidDeleteDeltaFiles;
-
-          if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-            continue;
-          }
-
-          // aborted scenario.
-          invalidDeleteDeltaFiles = updateStatusManager
-              .getDeleteDeltaInvalidFilesList(block, false,
-                  allSegmentFiles, isAbortedFile);
-          for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-            boolean doForceDelete = true;
-            compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-          }
 
-          // case 1
-          if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-            completeListOfDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(block, true,
-                            allSegmentFiles, isInvalidFile);
-            for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
+          // get Invalid update  delta files.
+          CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
+              .getUpdateDeltaFilesList(segment, false,
+                  CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
+                  isInvalidFile);
 
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+          // now for each invalid delta file need to check the query execution time out
+          // and then delete.
+          for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
+            compareTimestampsAndDelete(invalidFile, forceDelete, false);
+          }
+          // do the same for the index files.
+          CarbonFile[] invalidIndexFiles = updateStatusManager
+              .getUpdateDeltaFilesList(segment, false,
+                  CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
+                  isInvalidFile);
+
+          // now for each invalid index file need to check the query execution time out
+          // and then delete.
+
+          for (CarbonFile invalidFile : invalidIndexFiles) {
+            if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+              updateSegmentFile = true;
+            }
+          }
+          // now handle all the delete delta files which needs to be deleted.
+          // there are 2 cases here .
+          // 1. if the block is marked as compacted then the corresponding delta files
+          //    can be deleted if query exec timeout is done.
+          // 2. if the block is in success state then also there can be delete
+          //    delta compaction happened and old files can be deleted.
+
+          SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
+          for (SegmentUpdateDetails block : updateDetails) {
+            CarbonFile[] completeListOfDeleteDeltaFiles;
+            CarbonFile[] invalidDeleteDeltaFiles;
+
+            if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
+              continue;
             }
 
-          } else {
+            // aborted scenario.
             invalidDeleteDeltaFiles = updateStatusManager
-                    .getDeleteDeltaInvalidFilesList(block, false,
-                            allSegmentFiles, isInvalidFile);
+                .getDeleteDeltaInvalidFilesList(block, false,
+                    allSegmentFiles, isAbortedFile);
             for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+              boolean doForceDelete = true;
+              compareTimestampsAndDelete(invalidFile, doForceDelete, false);
+            }
 
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+            // case 1
+            if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
+              completeListOfDeleteDeltaFiles = updateStatusManager
+                  .getDeleteDeltaInvalidFilesList(block, true,
+                      allSegmentFiles, isInvalidFile);
+              for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
+                compareTimestampsAndDelete(invalidFile, forceDelete, false);
+              }
+
+            } else {
+              invalidDeleteDeltaFiles = updateStatusManager
+                  .getDeleteDeltaInvalidFilesList(block, false,
+                      allSegmentFiles, isInvalidFile);
+              for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
+                compareTimestampsAndDelete(invalidFile, forceDelete, false);
+              }
             }
           }
-        }
-        if (updateSegmentFile) {
-          segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
+          if (updateSegmentFile) {
+            segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName(), null));
+          }
         }
       }
     }
@@ -616,7 +618,7 @@ public class CarbonUpdateUtil {
   private static boolean deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
       CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
     CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-        .getUpdateDeltaFilesList(segment.getLoadName(), false,
+        .getUpdateDeltaFilesList(segment, false,
             CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
             true);
     // now for each invalid delta file need to check the query execution time out
@@ -626,7 +628,7 @@ public class CarbonUpdateUtil {
     }
     // do the same for the index files.
     CarbonFile[] invalidIndexFiles = updateStatusManager
-        .getUpdateDeltaFilesList(segment.getLoadName(), false,
+        .getUpdateDeltaFilesList(segment, false,
             CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
             true);
     // now for each invalid index file need to check the query execution time out
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index fddce90..77b9c52 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -823,8 +823,7 @@ public class SegmentStatusManager {
     }
   }
 
-  private static boolean isLoadDeletionRequired(String metaDataLocation) {
-    LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
+  private static boolean isLoadDeletionRequired(LoadMetadataDetails[] details) {
     if (details != null && details.length > 0) {
       for (LoadMetadataDetails oneRow : details) {
         if ((SegmentStatus.MARKED_FOR_DELETE == oneRow.getSegmentStatus()
@@ -904,34 +903,27 @@ public class SegmentStatusManager {
     }
   }
 
-  private static ReturnTuple isUpdationRequired(
-      boolean isForceDeletion,
-      CarbonTable carbonTable,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
-    LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+  private static ReturnTuple isUpdationRequired(boolean isForceDeletion, CarbonTable carbonTable,
+      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] details) {
     // Delete marked loads
-    boolean isUpdationRequired =
-        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-            absoluteTableIdentifier,
-            isForceDeletion,
-            details,
-            carbonTable.getMetadataPath()
-        );
+    boolean isUpdationRequired = DeleteLoadFolders
+        .deleteLoadFoldersFromFileSystem(absoluteTableIdentifier, isForceDeletion, details,
+            carbonTable.getMetadataPath());
     return new ReturnTuple(details, isUpdationRequired);
   }
 
-  public static void deleteLoadsAndUpdateMetadata(
-      CarbonTable carbonTable,
-      boolean isForceDeletion,
+  public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, boolean isForceDeletion,
       List<PartitionSpec> partitionSpecs) throws IOException {
+    LoadMetadataDetails[] metadataDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
     // delete the expired segment lock files
     CarbonLockUtil.deleteExpiredSegmentLockFiles(carbonTable);
-    if (isLoadDeletionRequired(carbonTable.getMetadataPath())) {
+    if (isLoadDeletionRequired(metadataDetails)) {
       AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
       boolean updationCompletionStatus = false;
       LoadMetadataDetails[] newAddedLoadHistoryList = null;
-      ReturnTuple tuple = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+      ReturnTuple tuple =
+          isUpdationRequired(isForceDeletion, carbonTable, identifier, metadataDetails);
       if (tuple.isUpdateRequired) {
         ICarbonLock carbonTableStatusLock =
             CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK);
@@ -942,7 +934,10 @@ public class SegmentStatusManager {
           if (locked) {
             LOG.info("Table status lock has been successfully acquired.");
             // Again read status and check to verify updation required or not.
-            ReturnTuple tuple2 = isUpdationRequired(isForceDeletion, carbonTable, identifier);
+            LoadMetadataDetails[] details =
+                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+            ReturnTuple tuple2 =
+                isUpdationRequired(isForceDeletion, carbonTable, identifier, details);
             if (!tuple2.isUpdateRequired) {
               return;
             }
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index a02e903..eace1b7 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -442,29 +442,22 @@ public class SegmentUpdateStatusManager {
   /**
    * Returns all update delta files of specified Segment.
    *
-   * @param segmentId
+   * @param loadMetadataDetail metadatadetails of segment
    * @param validUpdateFiles if true then only the valid range files will be returned.
    * @return
    */
-  public CarbonFile[] getUpdateDeltaFilesList(String segmentId, final boolean validUpdateFiles,
-      final String fileExtension, final boolean excludeOriginalFact,
+  public CarbonFile[] getUpdateDeltaFilesList(LoadMetadataDetails loadMetadataDetail,
+      final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
       CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {
 
     String endTimeStamp = "";
     String startTimeStamp = "";
     long factTimeStamp = 0;
 
-    LoadMetadataDetails[] segmentDetails = SegmentStatusManager.readLoadMetadata(
-        CarbonTablePath.getMetadataPath(identifier.getTablePath()));
-
-    for (LoadMetadataDetails eachSeg : segmentDetails) {
-      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
-        // if the segment is found then take the start and end time stamp.
-        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
-        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
-        factTimeStamp = eachSeg.getLoadStartTime();
-      }
-    }
+    // if the segment is found then take the start and end time stamp.
+    startTimeStamp = loadMetadataDetail.getUpdateDeltaStartTimestamp();
+    endTimeStamp = loadMetadataDetail.getUpdateDeltaEndTimestamp();
+    factTimeStamp = loadMetadataDetail.getLoadStartTime();
 
     // if start timestamp is empty then no update delta is found. so return empty list.
     if (startTimeStamp.isEmpty()) {
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index f5e429e..0544d22 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -217,7 +217,7 @@ object CarbonStore {
    */
   def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
       partitionSpecList: List[PartitionSpec]): Unit = {
-    if (carbonTable != null) {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
       val loadMetadataDetails = SegmentStatusManager
         .readLoadMetadata(carbonTable.getMetadataPath)
 
@@ -265,7 +265,6 @@ object CarbonStore {
         if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
           // delete the file
           FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath))
-
         }
     }
   }