You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/12/06 18:10:31 UTC

[GitHub] [carbondata] QiangCai opened a new pull request #4044: [CARBONDATA-4062] Refactor clean files feature

QiangCai opened a new pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044


    ### Why is this PR needed?
    To prevent accidental deletion of data, carbon will introduce trash data management. It will provide buffer time for accidental deletion of data to roll back the delete operation.
   
   Trash data management is a part of carbon data lifecycle management. Clean files as a data trash manager should contain the following two parts.
   part 1: manage metadata-indexed data trash.
     This data is at the original place of the table and indexed by metadata. carbon manages this data by metadata index and should avoid using listFile() interface.
   part 2: manage ".Trash" folder.
      Now ".Trash" folder is without metadata index, and the operation on it bases on timestamp and listFile() interface. In the future, carbon will index ".Trash" folder to improve data trash management.
    
    ### What changes were proposed in this PR?
   remove data clean function from all features, but keep exception-handling part
   Notes: the following features still clean data
   a) drop table/database/partition/index/mv
   b) insert/load overwrite table/partition
   only clean files function works as a data trash manager now
   support concurrent operation with other feature(loading, compaction, update/delete, and so on)
       
    ### Does this PR introduce any user interface change?
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277508



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739749809


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3318/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537271258



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       yes, agree with @ajantha-bhat , if auto compaction success pre-prime that segment else just load segment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739671395


   @kunal642 , @vikramahuja1001 , @akashrn5 : I think we all 4 are reviewing this PR. Give comments once you find it. don't give all at the end. so that we can avoid duplicate comments. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717393


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3316/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537259682



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
       ```suggestion
      * check If the  fileTimestamp is expired based on `carbon.trash.retention.days`
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275990



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1)
+      if (carbonTable.isHivePartitionTable) {
+        if (partitionSpecs.isDefined) {
+          partitionSpecs.get.foreach { partitionSpec =>
+            cleanStaleCompactionDataFiles(
+              partitionSpec.getLocation.toString, segmentId, factTimestamp)
+          }
+        }
+      } else {
+        val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+        cleanStaleCompactionDataFiles(
+          segmentPath, segmentId, factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.contains(namePart)
+        }
+      })
+      if (toBeDelete != null && toBeDelete.nonEmpty) {
+        try {
+          CarbonUtil.deleteFoldersAndFilesSilent(toBeDelete: _*)
+        } catch {
+          case e: Throwable =>
+            LOGGER.error("Exception in deleting the delta files." + e)

Review comment:
       fixed the exception message.
   clean data before compaction is a good suggestion, but it may remove data files(which create by other jobs) by mistake.
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281123



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()

Review comment:
       already add code to handle compaction exception instead of this function.
   this function will list the whole table to clean partial loads.
   new function only focus on current load.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537270696



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
    * folder will take place
    */
   private void validateTrashFolderRetentionTime() {
-    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    String propertyValue = carbonProperties.getProperty(

Review comment:
       @ajantha-bhat the `getTrashFolderRetentionTime ` method implementation is just IntegerparseInt(Carbonproperties.getproperty) so, here also its the same thing right. What exactly you mean its already stored and not validation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537235165



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1)
+      if (carbonTable.isHivePartitionTable) {
+        if (partitionSpecs.isDefined) {
+          partitionSpecs.get.foreach { partitionSpec =>
+            cleanStaleCompactionDataFiles(
+              partitionSpec.getLocation.toString, segmentId, factTimestamp)
+          }
+        }
+      } else {
+        val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+        cleanStaleCompactionDataFiles(
+          segmentPath, segmentId, factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.contains(namePart)
+        }
+      })
+      if (toBeDelete != null && toBeDelete.nonEmpty) {
+        try {
+          CarbonUtil.deleteFoldersAndFilesSilent(toBeDelete: _*)
+        } catch {
+          case e: Throwable =>
+            LOGGER.error("Exception in deleting the delta files." + e)

Review comment:
       maybe instead of cleaning compacted segments after some compaction failure, we can maybe try and clear before the compaction itself. Let's say we are trying to compact to segment 0.1, so there we can.
   ```1. If segment 0.1 already exists -> clean it -> If the cleaning fails -> fail the compaction command itself ```
   ```2. else -> proceed with compaction```
   
   In the current implementation, if cleaning the stale compact segment fails, and if the user does not manually clean it, there could be issues in the next compaction command. If we do it before compaction we can just fail the compaction itself and ask user to clean manually. What do you think?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739642960


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273799



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -19,13 +19,13 @@ package org.apache.carbondata.api
 
 import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
 import java.time.{Duration, Instant}
-import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
+import java.util

Review comment:
       reverted




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739959057


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3334/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739781549


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3320/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] kunal642 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
kunal642 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537236952



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments
+    List<String> staleSegments = segmentFiles.stream()
+        .filter(segmentFile -> !DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+            CarbonCommonConstants.POINT))
+        .filter(segmentFile -> !loadNameSet.contains(

Review comment:
       Please combine both filters into one




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537282367



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)

Review comment:
       ```suggestion
         checkAndCleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537357883



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,59 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(

Review comment:
       ok, I will refactor in new pr.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537248838



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments

Review comment:
       please add a detail comments about why we need to ignore stale compacted (x.y) segment. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       ok, we can do it in another pr, compaction maybe not compact this segment.
   here need it also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739986550


   Merging this, please handle the open issue raised (example JIRA-CARBONDATA-4074) in another PR soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537260678



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out
+   */
+  private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+    int retentionDays = CarbonProperties.getInstance().getTrashFolderRetentionTime();
+    long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays;
+    return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > retentionMilliSeconds;
+  }
+
+  /**
+   * whether trash data outside of .Trash folder is time out
    */
-  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
-    // record current time.
-    long currentTime = CarbonUpdateUtil.readCurrentTime();
-    long retentionMilliSeconds = (long)Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer.toString(
-          CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * TimeUnit.DAYS
-        .toMillis(1);
-    long difference = currentTime - fileTimestamp;
-    return difference > retentionMilliSeconds;
+  public static boolean isTrashDataTimeout(long fileTimestamp) {

Review comment:
       ```suggestion
     public static boolean isDataOutsideTrashIsExpired(long fileTimestamp) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739958168


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5095/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739772818


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739664259


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3311/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739984807


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739745639


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3341/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739715250


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5074/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739689562


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3314/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537264650



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()

Review comment:
       In this file, other than removing the `deletePartialLoadsInCompaction`, rest of the changes are nothing to do with clean files PR. can you raise a separate PR for this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739883277


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3331/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277390



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1)
+      if (carbonTable.isHivePartitionTable) {
+        if (partitionSpecs.isDefined) {
+          partitionSpecs.get.foreach { partitionSpec =>
+            cleanStaleCompactionDataFiles(
+              partitionSpec.getLocation.toString, segmentId, factTimestamp)
+          }
+        }
+      } else {
+        val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+        cleanStaleCompactionDataFiles(
+          segmentPath, segmentId, factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {

Review comment:
       accepted




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537305840



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       ok, can you raise another pr to fix it?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537330003



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,59 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(

Review comment:
       would it be better if we call cleanFilesCommand for each index tables, like how MV is handled down ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739664837


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5092/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ydvpankaj99 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ydvpankaj99 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739687768


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277121



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
    * folder will take place
    */
   private void validateTrashFolderRetentionTime() {
-    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    String propertyValue = carbonProperties.getProperty(

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments

Review comment:
       done

##########
File path: docs/clean-files.md
##########
@@ -38,6 +38,9 @@ The above clean files command will clean Marked For Delete and Compacted segment
    ``` 
   Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command.
 
+**NOTE**:
+  * In trash folder, the retention time is "carbon.trash.retention.days"
+  * Outside trash folder, the retention time is max value of two properties("carbon.trash.retention.days", "max.query.execution.time")

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537484782



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,59 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(

Review comment:
       please see if you can do along with  [CARBONDATA-4074](https://issues.apache.org/jira/browse/CARBONDATA-4074)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537289240



##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) {
 
   }
 
-  /**
-   * Handling of the clean up of old carbondata files, index files , delete delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException {
-
-    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-    LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
-
-    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
-    SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails();
-    // hold all the segments updated so that wen can check the delta files in them, ne need to
-    // check the others.
-    Set<String> updatedSegments = new HashSet<>();
-    for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
-      updatedSegments.add(updateDetails.getSegmentName());
-    }
-
-    String validUpdateStatusFile = "";
-
-    boolean isAbortedFile = true;
-
-    boolean isInvalidFile = false;
-
-    // take the update status file name from 0th segment.
-    validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-    // scan through each segment.
-    for (LoadMetadataDetails segment : details) {
-      // if this segment is valid then only we will go for delta file deletion.
-      // if the segment is mark for delete or compacted then any way it will get deleted.
-      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-              || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-        // when there is no update operations done on table, then no need to go ahead. So
-        // just check the update delta start timestamp and proceed if not empty
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
-                || updatedSegments.contains(segment.getLoadName())) {
-          // take the list of files from this segment.
-          String segmentPath = CarbonTablePath.getSegmentPath(
-              table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
-          CarbonFile segDir =
-              FileFactory.getCarbonFile(segmentPath);
-          CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-          // now handle all the delete delta files which needs to be deleted.
-          // there are 2 cases here .
-          // 1. if the block is marked as compacted then the corresponding delta files
-          //    can be deleted if query exec timeout is done.
-          // 2. if the block is in success state then also there can be delete
-          //    delta compaction happened and old files can be deleted.
-
-          SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-          for (SegmentUpdateDetails block : updateDetails) {
-            CarbonFile[] completeListOfDeleteDeltaFiles;
-            CarbonFile[] invalidDeleteDeltaFiles;
-
-            if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-              continue;
-            }
-
-            // aborted scenario.
-            invalidDeleteDeltaFiles = updateStatusManager
-                .getDeleteDeltaInvalidFilesList(block, false,
-                    allSegmentFiles, isAbortedFile);
-            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-              boolean doForceDelete = true;
-              compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-            }
-
-            // case 1
-            if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-              completeListOfDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, true,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-
-            } else {
-              invalidDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, false,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-            }
-          }
-        }
-        // handle cleanup of merge index files and data files after small files merge happened for
-        // SI table
-        cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment);
-      }
-    }
-
-    // delete the update table status files which are old.
-    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
-
-      final String updateStatusTimestamp = validUpdateStatusFile
-          .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
-
-      String tablePath = table.getAbsoluteTableIdentifier().getTablePath();
-      CarbonFile metaFolder = FileFactory.getCarbonFile(
-          CarbonTablePath.getMetadataPath(tablePath));
-
-      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
-            // CHECK if this is valid or not.
-            // we only send invalid ones to delete.
-            return !file.getName().endsWith(updateStatusTimestamp);
-          }
-          return false;
-        }
-      });
-
-      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
-        compareTimestampsAndDelete(invalidFile, forceDelete, true);
-      }
-    }
-  }
-
-  /**
-   * this is the clean up added specifically for SI table, because after we merge the data files
-   * inside the secondary index table, we need to delete the stale carbondata files.
-   * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
-   */
-  private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table,

Review comment:
       done
   [Should clean stale data in success segments](https://issues.apache.org/jira/browse/CARBONDATA-4074)
   
   cleaning stale data in success segments include the following parts. 
   1.  clean stale delete delta (when force is true)
   2. clean stale small  files for index table
   3. clean stale data files for loading/compaction




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739547241


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5085/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537232374



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1)
+      if (carbonTable.isHivePartitionTable) {
+        if (partitionSpecs.isDefined) {
+          partitionSpecs.get.foreach { partitionSpec =>
+            cleanStaleCompactionDataFiles(
+              partitionSpec.getLocation.toString, segmentId, factTimestamp)
+          }
+        }
+      } else {
+        val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+        cleanStaleCompactionDataFiles(
+          segmentPath, segmentId, factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.contains(namePart)
+        }
+      })
+      if (toBeDelete != null && toBeDelete.nonEmpty) {
+        try {
+          CarbonUtil.deleteFoldersAndFilesSilent(toBeDelete: _*)
+        } catch {
+          case e: Throwable =>
+            LOGGER.error("Exception in deleting the delta files." + e)

Review comment:
       delta files? Can we change this and also include the segmentID too?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739635141






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739869023


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5088/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277683



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        indexTable)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+        indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress,
+        true)
+      cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+    }
+  }
+
+  private def cleanFilesForMv(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      options: Map[String, String]
+  ): Unit = {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739562913






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252406



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {

Review comment:
       move this line above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252530



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        indexTable)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+        indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress,
+        true)
+      cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+    }
+  }
+
+  private def cleanFilesForMv(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      options: Map[String, String]
+  ): Unit = {

Review comment:
       same as above




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230288



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments

Review comment:
       we don't want to send compacted segments to the trash folder?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537258139



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
    * folder will take place
    */
   private void validateTrashFolderRetentionTime() {
-    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    String propertyValue = carbonProperties.getProperty(

Review comment:
       @akashrn5 : getTrashFolderRetentionTime is used to access the already validated and stored value. It won't check the parsing errors as it is already validated. so current changes looks ok for me for this first time validation for add property 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739634201






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata
+
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {

Review comment:
       done
   [Should refactor to use withEvents instead of fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537264650



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()

Review comment:
       other than removing the `deletePartialLoadsInCompaction`, rest of the changes are nothing to do with clean files PR. can you raise a separate PR for this? 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata
+
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {

Review comment:
       done
   [Should refactor carbon to use withEvents instead of fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278313



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out
+   */
+  private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+    int retentionDays = CarbonProperties.getInstance().getTrashFolderRetentionTime();
+    long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays;
+    return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > retentionMilliSeconds;
+  }
+
+  /**
+   * whether trash data outside of .Trash folder is time out
    */
-  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
-    // record current time.
-    long currentTime = CarbonUpdateUtil.readCurrentTime();
-    long retentionMilliSeconds = (long)Integer.parseInt(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, Integer.toString(
-          CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * TimeUnit.DAYS
-        .toMillis(1);
-    long difference = currentTime - fileTimestamp;
-    return difference > retentionMilliSeconds;
+  public static boolean isTrashDataTimeout(long fileTimestamp) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] asfgit closed pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739780017


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5101/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276094



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments
+    List<String> staleSegments = segmentFiles.stream()
+        .filter(segmentFile -> !DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+            CarbonCommonConstants.POINT))
+        .filter(segmentFile -> !loadNameSet.contains(

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739778382






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739688814


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5095/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537248198



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments
+    List<String> staleSegments = segmentFiles.stream()
+        .filter(segmentFile -> !DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+            CarbonCommonConstants.POINT))
+        .filter(segmentFile -> !loadNameSet.contains(

Review comment:
       agree with @kunal642 and also call `DataFileUtil.getSegmentNoFromSegmentFile` only once per segment




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739547305


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3304/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273944



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537420813



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537259682



##########
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##########
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
       ```suggestion
      * check If the fileTimestamp is expired based on `carbon.trash.retention.days`
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245017



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))
+    // only clean stale compaction segment
+    if (loadDetail.isEmpty) {
+      val segmentId = mergedLoadName.split(CarbonCommonConstants.UNDERSCORE)(1)
+      if (carbonTable.isHivePartitionTable) {
+        if (partitionSpecs.isDefined) {
+          partitionSpecs.get.foreach { partitionSpec =>
+            cleanStaleCompactionDataFiles(
+              partitionSpec.getLocation.toString, segmentId, factTimestamp)
+          }
+        }
+      } else {
+        val segmentPath = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)
+        cleanStaleCompactionDataFiles(
+          segmentPath, segmentId, factTimestamp)
+      }
+    }
+  }
+
+  private def cleanStaleCompactionDataFiles(
+      folderPath: String,
+      segmentId: String,
+      factTimestamp: Long): Unit = {
+    if (FileFactory.isFileExist(folderPath)) {
+      val namePart = CarbonCommonConstants.HYPHEN + segmentId +
+        CarbonCommonConstants.HYPHEN + factTimestamp
+      val toBeDelete = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {

Review comment:
       ```suggestion
         val toBeDeleted = FileFactory.getCarbonFile(folderPath).listFiles(new CarbonFileFilter() {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537271584



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()

Review comment:
       better to add proper description in the PR and handle here only, instead of handling again in other PR, as review will be easy and to avoid duplicate working, should be fine @ajantha-bhat ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739745898


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5076/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537283710



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment

Review comment:
       ```suggestion
      * move stale segment to trash folder, but not include stale compaction (x.y) segment
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275885



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       If auto compaction fails, the current load will be passed. we can trigger pre-priming. 
   
   Why I suggested after auto comapction is for success case, that time no need to pre-prime current segment as it will become MFD if it goes under auto compaction. so, we can save pre-priming one segment may be. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537288377



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       calling two times will increase time, better to have a logic to find out whether compacted or not and based on that send the segments to pre-prime only once, its better.
   
   Also in `DistributedRDDUtils.scala`, line number 376, new `SegmentUpdateStatusManager `is created which is not used, its simply reading the table status file and update status, please check if it can be removed. Just another input to optimization.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717520


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5097/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230866



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##########
@@ -19,13 +19,13 @@ package org.apache.carbondata.api
 
 import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
 import java.time.{Duration, Instant}
-import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
+import java.util

Review comment:
       why this change? I think CI will fail because of this change




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739648471


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3334/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       ok, we can do it in another pr.
   here need it also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278197



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        indexTable)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+        indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress,
+        true)
+      cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+    }
+  }
+
+  private def cleanFilesForMv(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      options: Map[String, String]
+  ): Unit = {
+    val viewSchemas = MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
+    if (!viewSchemas.isEmpty) {
+      viewSchemas.asScala.map { schema =>

Review comment:
       fixed, the following command is using the wrong table, changed it to use this variable: schema




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739891819


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252858



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach { indexTable =>
+      val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        indexTable)
+      SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+        indexTable, isForceDelete, partitions.map(_.asJava).orNull, cleanStaleInProgress,
+        true)
+      cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+    }
+  }
+
+  private def cleanFilesForMv(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      options: Map[String, String]
+  ): Unit = {
+    val viewSchemas = MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
+    if (!viewSchemas.isEmpty) {
+      viewSchemas.asScala.map { schema =>

Review comment:
       replace  `schema ` with a placeholder as its not used




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537302169



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)

Review comment:
       for method moveStaleSegmentsToTrash, it need readLoadMetadata after listFiles.
   the optinal solution is that moveStaleSegmentsToTrash method return a LoadMetadata array.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245787



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))

Review comment:
       mergedLoadName will be Segment_(segmentnumber), like Segment_0.1, loadDetail will always be null, as "Segment_0.1" will never be equal to detail.loadName. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277613



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##########
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends OperationEventListener with Logging {
     event match {
       case cleanFilesPostEvent: CleanFilesPostEvent =>
         LOGGER.info("Clean files post event listener called")
-        val carbonTable = cleanFilesPostEvent.carbonTable
-        val indexTables = CarbonIndexUtil
-          .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-        val isForceDelete = cleanFilesPostEvent.ifForceDelete
-        val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-        indexTables.foreach { indexTable =>
-          val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
-            Seq.empty[Expression],
-            cleanFilesPostEvent.sparkSession,
-            indexTable)
-          SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-              indexTable, isForceDelete, partitions.map(_.asJava).orNull, inProgressSegmentsClean,
-            true)
-          CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-          cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-        }
+        cleanFilesForIndex(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+          cleanFilesPostEvent.options.getOrElse("stale_inprogress", "false").toBoolean)
+
+        cleanFilesForMv(
+          cleanFilesPostEvent.sparkSession,
+          cleanFilesPostEvent.carbonTable,
+          cleanFilesPostEvent.options)
+    }
+  }
+
+  private def cleanFilesForIndex(
+      sparkSession: SparkSession,
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean
+  ): Unit = {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537266114



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       is it better to trigger pre-priming after auto-compaction code ?  @akashrn5 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739687261


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3336/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739821005


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3327/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739780939


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5077/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739903966


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739890535


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3332/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739889707


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)

Review comment:
       I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we call `SegmentStatusManager.readLoadMetadata`, is it possible to combine two methods so that we read table status once and then if it is stale we move to trash, it is expired we delete it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230858



##########
File path: core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##########
@@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) {
 
   }
 
-  /**
-   * Handling of the clean up of old carbondata files, index files , delete delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException {
-
-    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-    LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
-
-    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
-    SegmentUpdateDetails[] segmentUpdateDetails = updateStatusManager.getUpdateStatusDetails();
-    // hold all the segments updated so that wen can check the delta files in them, ne need to
-    // check the others.
-    Set<String> updatedSegments = new HashSet<>();
-    for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
-      updatedSegments.add(updateDetails.getSegmentName());
-    }
-
-    String validUpdateStatusFile = "";
-
-    boolean isAbortedFile = true;
-
-    boolean isInvalidFile = false;
-
-    // take the update status file name from 0th segment.
-    validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-    // scan through each segment.
-    for (LoadMetadataDetails segment : details) {
-      // if this segment is valid then only we will go for delta file deletion.
-      // if the segment is mark for delete or compacted then any way it will get deleted.
-      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-              || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-        // when there is no update operations done on table, then no need to go ahead. So
-        // just check the update delta start timestamp and proceed if not empty
-        if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
-                || updatedSegments.contains(segment.getLoadName())) {
-          // take the list of files from this segment.
-          String segmentPath = CarbonTablePath.getSegmentPath(
-              table.getAbsoluteTableIdentifier().getTablePath(), segment.getLoadName());
-          CarbonFile segDir =
-              FileFactory.getCarbonFile(segmentPath);
-          CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-          // now handle all the delete delta files which needs to be deleted.
-          // there are 2 cases here .
-          // 1. if the block is marked as compacted then the corresponding delta files
-          //    can be deleted if query exec timeout is done.
-          // 2. if the block is in success state then also there can be delete
-          //    delta compaction happened and old files can be deleted.
-
-          SegmentUpdateDetails[] updateDetails = updateStatusManager.readLoadMetadata();
-          for (SegmentUpdateDetails block : updateDetails) {
-            CarbonFile[] completeListOfDeleteDeltaFiles;
-            CarbonFile[] invalidDeleteDeltaFiles;
-
-            if (!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-              continue;
-            }
-
-            // aborted scenario.
-            invalidDeleteDeltaFiles = updateStatusManager
-                .getDeleteDeltaInvalidFilesList(block, false,
-                    allSegmentFiles, isAbortedFile);
-            for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-              boolean doForceDelete = true;
-              compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-            }
-
-            // case 1
-            if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-              completeListOfDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, true,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-
-            } else {
-              invalidDeleteDeltaFiles = updateStatusManager
-                  .getDeleteDeltaInvalidFilesList(block, false,
-                      allSegmentFiles, isInvalidFile);
-              for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-                compareTimestampsAndDelete(invalidFile, forceDelete, false);
-              }
-            }
-          }
-        }
-        // handle cleanup of merge index files and data files after small files merge happened for
-        // SI table
-        cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment);
-      }
-    }
-
-    // delete the update table status files which are old.
-    if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
-
-      final String updateStatusTimestamp = validUpdateStatusFile
-          .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1);
-
-      String tablePath = table.getAbsoluteTableIdentifier().getTablePath();
-      CarbonFile metaFolder = FileFactory.getCarbonFile(
-          CarbonTablePath.getMetadataPath(tablePath));
-
-      CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
-            // CHECK if this is valid or not.
-            // we only send invalid ones to delete.
-            return !file.getName().endsWith(updateStatusTimestamp);
-          }
-          return false;
-        }
-      });
-
-      for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
-        compareTimestampsAndDelete(invalidFile, forceDelete, true);
-      }
-    }
-  }
-
-  /**
-   * this is the clean up added specifically for SI table, because after we merge the data files
-   * inside the secondary index table, we need to delete the stale carbondata files.
-   * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
-   */
-  private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table,

Review comment:
       can you please create a jira for the points discussed offline to track these parts?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String tableName) {
    * folder will take place
    */
   private void validateTrashFolderRetentionTime() {
-    String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-        .CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+    String propertyValue = carbonProperties.getProperty(

Review comment:
       can you use `getTrashFolderRetentionTime` here also? so you avoid integer parsing again

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##########
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable carbonTable, List<String> s
     }
     Set<String> loadNameSet = Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
         .collect(Collectors.toSet());
-    List<String> staleSegments = segmentFiles.stream().filter(segmentFile -> !loadNameSet.contains(
-        DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+    // get all stale segment files, not include compaction segments

Review comment:
       can you please add two more line in comment saying why we need to exclude, so that next time any developer shouldnt remove by mistake. you can add like `During compaction we dont make entry in table status, so if parallelly clean files is triggered, it consider as stale and move to trash`

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
##########
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata
+
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {

Review comment:
       can you raise a jira and refactor later for all the events? as now its handled for clean files only

##########
File path: docs/clean-files.md
##########
@@ -38,6 +38,9 @@ The above clean files command will clean Marked For Delete and Compacted segment
    ``` 
   Once the timestamp subdirectory is expired as per the configured expiration day value, that subdirectory is deleted from the trash folder in the subsequent clean files command.
 
+**NOTE**:
+  * In trash folder, the retention time is "carbon.trash.retention.days"
+  * Outside trash folder, the retention time is max value of two properties("carbon.trash.retention.days", "max.query.execution.time")

Review comment:
       ```suggestion
     * Outside trash folder(Segment Directories in store path), the retention time is Max("carbon.trash.retention.days", "max.query.execution.time")
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245787



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)
+      // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+      cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, partitionSpecs)
+    } finally {
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, isForceDelete: Boolean): Unit = {
+    if (isForceDelete) {
+      // empty the trash folder
+      TrashUtil.emptyTrash(carbonTable.getTablePath)
+    } else {
+      // clear trash based on timestamp
+      TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+    }
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+    if (carbonTable.isHivePartitionTable) {
+      CleanFilesUtil.cleanStaleSegmentsForPartitionTable(carbonTable)
+    } else {
+      CleanFilesUtil.cleanStaleSegments(carbonTable)
+    }
+  }
+
+  private def cleanExpiredSegments(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecsOption: Option[Seq[PartitionSpec]]): Unit = {
+    val partitionSpecs = partitionSpecsOption.map(_.asJava).orNull
+    SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable,
+      isForceDelete, partitionSpecs, cleanStaleInProgress, true)
+    if (carbonTable.isHivePartitionTable && partitionSpecsOption.isDefined) {
+      SegmentFileStore.cleanSegments(carbonTable, partitionSpecs, isForceDelete)
+    }
+  }
+
+  /**
+   * clean the stale compact segment immediately after compaction failure
+   */
+  def cleanStaleCompactionSegment(
+      carbonTable: CarbonTable,
+      mergedLoadName: String,
+      factTimestamp: Long,
+      partitionSpecs: Option[Seq[PartitionSpec]]): Unit = {
+    val metadataFolderPath = CarbonTablePath.getMetadataPath(carbonTable.getTablePath)
+    val details = SegmentStatusManager.readLoadMetadata(metadataFolderPath)
+    if (details == null || details.isEmpty) {
+      return
+    }
+    val loadDetail = details.find(detail => mergedLoadName.equals(detail.getLoadName))

Review comment:
       mergedLoadName will be Segment_(segmentnumber), like Segment_0.1, loadDetail will always be empty, as "Segment_0.1" will never be equal to detail.loadName. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276344



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
             throw new Exception("Exception in compaction " + exception.getMessage)
           }
         } finally {
-          executor.shutdownNow()
-          try {
-            compactor.deletePartialLoadsInCompaction()

Review comment:
       ok, I see that many changes are done because to support this `DataTrashManager.cleanStaleCompactionSegment`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739649949


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5071/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] akashrn5 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
akashrn5 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739984283


   LGTM


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537305840



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##########
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
           LOGGER.info("Data load is successful for " +
                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
-
-        // code to handle Pre-Priming cache for loading
-
-        if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-          DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),
-            operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-        }
-        try {
-          // compaction handling
-          if (carbonTable.isHivePartitionTable) {
-            carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-          }
-          val compactedSegments = new util.ArrayList[String]()
-          handleSegmentMerging(sqlContext,
-            carbonLoadModel
-              .getCopyWithPartition(carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter),
-            carbonTable,
-            compactedSegments,
-            operationContext)
-          carbonLoadModel.setMergedSegmentIds(compactedSegments)
-          writtenSegment
-        } catch {
-          case e: Exception =>
-            LOGGER.error(
-              "Auto-Compaction has failed. Ignoring this exception because the" +
-              " load is passed.", e)
-            writtenSegment
-        }
+        isLoadingCommitted = true
+        writtenSegment
       }
     } finally {
       // Release the segment lock, once table status is finally updated
       segmentLock.unlock()
+      if (isLoadingCommitted) {
+        triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, operationContext)
+      }
+    }
+  }
+
+  private def triggerEventsAfterLoading(
+      sqlContext: SQLContext,
+      carbonLoadModel: CarbonLoadModel,
+      hadoopConf: Configuration,
+      operationContext: OperationContext): Unit = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    // code to handle Pre-Priming cache for loading
+    if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+      DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, carbonTable, Seq(),

Review comment:
       ok, can you raise another pr to fix it?
   this pr only focus clean files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803



##########
File path: integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDelete        clean the MFD/Compacted segments immediately and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on retention time,
+   *                             it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+      carbonTable: CarbonTable,
+      isForceDelete: Boolean,
+      cleanStaleInProgress: Boolean,
+      partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+    // if isForceDelete = true need to throw exception if CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+    if (isForceDelete && !CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+      LOGGER.error("Clean Files with Force option deletes the physical data and it cannot be" +
+        " recovered. It is disabled by default, to enable clean files with force option," +
+        " set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " to true")
+      throw new RuntimeException("Clean files with force operation not permitted by default")
+    }
+    var carbonCleanFilesLock: ICarbonLock = null
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"${ carbonTable.getQualifiedName }" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      carbonCleanFilesLock = CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.CLEAN_FILES_LOCK, errorMsg)
+      // step 1: check and clean trash folder
+      checkAndCleanTrashFolder(carbonTable, isForceDelete)
+      // step 2: move stale segments which are not exists in metadata into .Trash
+      moveStaleSegmentsToTrash(carbonTable)

Review comment:
       I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we call `SegmentStatusManager.readLoadMetadata` multiple times, is it possible to combine two methods so that we read table status once and then if it is stale we move to trash, it is expired we delete it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org