You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/10/12 13:48:08 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3917: [CARBONDATA-3978] Clean Files Refactor and support for trash folder in carbondata

akashrn5 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r503218710



##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment,
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath,
+                                              String segmentNo, SegmentStatus segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants

Review comment:
       ```suggestion
             TrashUtil.copyDataToTrashFolder(tablePath, indexOrMergeFileLocation.toString(), CarbonCommonConstants
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment,
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath,
+                                              String segmentNo, SegmentStatus segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants
+                  .LOAD_FOLDER + segmentNo + CarbonCommonConstants.FILE_SEPARATOR +
+                  location.toString().substring(tablePath.length() + 1,
+                          location.toString().length()).split("/")[0]);

Review comment:
       use File separator constant

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the trash folder.

Review comment:
       this comment and the code below it doesn't match or does not mean, so please give a proper comment

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, entry.getKey().length()));

Review comment:
       ```suggestion
                     entry.getKey().substring(tablePath.length() + 1));
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, copy that file to the trash
+                  // folder.
+                  if (eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) &&

Review comment:
       how you are handling the index file? actually we need to take care at segment level right, rather than the each carbon file level? @kunal642 please leave your input on this

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */

Review comment:
       remove this comment

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    var res = cleanFilesDryRunOp(carbonTable, sparkSession)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach {
+      indexTable =>
+        res ++= cleanFilesDryRunOp(indexTable, sparkSession)
+    }
+  res
+  }
+
+  def dryRunStaleSegments(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // stale segments are those segments whose entry is not present in the table status file, but
+    // those segments are in Fact folder and metadata folder
+
+    val metaDataLocation = carbonTable.getMetadataPath
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    val finalSegments = new java.util.ArrayList[Row]()
+    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(partitionPath)) {
+      val allSegments = FileFactory.getCarbonFile(partitionPath).listFiles
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) Seq.empty

Review comment:
       replace with `isEmpty`
   

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    var res = cleanFilesDryRunOp(carbonTable, sparkSession)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach {
+      indexTable =>
+        res ++= cleanFilesDryRunOp(indexTable, sparkSession)
+    }
+  res
+  }
+
+  def dryRunStaleSegments(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // stale segments are those segments whose entry is not present in the table status file, but
+    // those segments are in Fact folder and metadata folder
+
+    val metaDataLocation = carbonTable.getMetadataPath
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    val finalSegments = new java.util.ArrayList[Row]()
+    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(partitionPath)) {
+      val allSegments = FileFactory.getCarbonFile(partitionPath).listFiles
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) Seq.empty
+      // there is no segment or failed to read tablestatus file.
+      if (details == null || details.length == 0) Seq.empty

Review comment:
       same as above

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment,
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath,
+                                              String segmentNo, SegmentStatus segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants
+                  .LOAD_FOLDER + segmentNo + CarbonCommonConstants.FILE_SEPARATOR +
+                  location.toString().substring(tablePath.length() + 1,
+                          location.toString().length()).split("/")[0]);

Review comment:
       ```suggestion
                            ).split("/")[0]);
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy,
+                                           String suffixToAdd) throws IOException {
+    String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {

Review comment:
       this complete if block is not required, because, when you call `FileUtils.copyFileToDirectory`, if the dest folder doesnt exists, it will create , so this if block does unnecessary checks and IO

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment,
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath,
+                                              String segmentNo, SegmentStatus segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), CarbonCommonConstants
+                  .LOAD_FOLDER + segmentNo + CarbonCommonConstants.FILE_SEPARATOR +
+                  location.toString().substring(tablePath.length() + 1,
+                          location.toString().length()).split("/")[0]);
+
           FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
         }
       } else {
+        // move to trash
         Path location = new Path(indexOrMergeFile);
+        if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {

Review comment:
       i think this check is unnecessary here, i mean checking for `SegmentStatus.INSERT_IN_PROGRESS`, because if you see the caller of this function, its sending the `indexOrMergeFiles `based on the segment status "`Marked_for_delete`". So checking for insert in progress doesnt make any sense. So changes to this function not required, please check once.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS,

Review comment:
       revert this change, avoid old code changes unless its required, else it will be confusing to review.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,

Review comment:
       do not pass tablName and dbname, you can get from `updateStatusManager` itself.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1057,7 +1058,7 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
               partitionSpecs,
               fileStore.getIndexFilesMap(),
               indexOrMergeFiles,
-              table.getTablePath());
+              table.getTablePath(), segment.getLoadName(), segment.getSegmentStatus());

Review comment:
       can just send the `segment` object, already the method has many parameters.

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, entry.getKey().length()));
+        }
+      }
       FileFactory.deleteFile(entry.getKey());
       for (String file : entry.getValue()) {
         String[] deltaFilePaths =
             updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo());
         for (String deltaFilePath : deltaFilePaths) {
+          // If the file to be deleted is a carbondata file, copy that file to the trash folder.
+          if (segmentStatus == SegmentStatus
+                  .INSERT_IN_PROGRESS) {
+            TrashUtil.copyDataToTrashFolder(tablePath, deltaFilePath, deltaFilePath
+                    .substring(tablePath.length() + 1, deltaFilePath.length()));
+          }
           FileFactory.deleteFile(deltaFilePath);
         }
+        // If the file to be deleted is a carbondata file, copy that file to the trash folder.
+        if (file.endsWith(CarbonCommonConstants.FACT_FILE_EXT) && segmentStatus ==
+                SegmentStatus.INSERT_IN_PROGRESS) {

Review comment:
       this `If` block wont be hit right? because here, `file` will always be an index file as you can see from line 1114

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, entry.getKey().length()));
+        }
+      }
       FileFactory.deleteFile(entry.getKey());
       for (String file : entry.getValue()) {
         String[] deltaFilePaths =
             updateStatusManager.getDeleteDeltaFilePath(file, segment.getSegmentNo());
         for (String deltaFilePath : deltaFilePaths) {
+          // If the file to be deleted is a carbondata file, copy that file to the trash folder.

Review comment:
       change the comment to corresponding functionality

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {

Review comment:
       format the complete code, its not following the checkstyle rules

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    var res = cleanFilesDryRunOp(carbonTable, sparkSession)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach {
+      indexTable =>

Review comment:
       move this line up, just reformat the code.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted = false;

Review comment:
       no need to initialize to false

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " +
+                "can be deleted as load is not going on");

Review comment:
       ```suggestion
                   "can be deleted as load is not in progress");
   ```

##########
File path: core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, Segment segment,
    * If partition specs are null, then directly delete parent directory in locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, String tablePath,
+                                              String segmentNo, SegmentStatus segmentStatus)

Review comment:
       please reformat the code of method signature

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {

Review comment:
       remove this

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " +
+                "can be deleted as load is not going on");
+        canBeDeleted = true;
+      } else {
+        LOGGER.info("Info: Load in progress for segment" + oneLoad.getLoadName());
+        canBeDeleted = false;
+      }
+    } finally {
+      segmentLock.unlock();
+      LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " is released");

Review comment:
       `segmentLock.unlock();` returns boolean, you should add this log only when the `unlock` returns true, else add an error log, as may be failed to unlock the segment lock and segment no. something like that.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],

Review comment:
       reformat the code

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy,
+                                           String suffixToAdd) throws IOException {
+    String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {
+          LOGGER.info("Creating Trash folder at:" + trashFolderPath);
+          FileFactory.createDirectoryAndSetPermission(trashFolderPath,
+                  new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        }
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy),
+                new File(trashFolderPath));
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder");
+    }
+  }
+
+  public static void copyDataRecursivelyToTrashFolder(CarbonFile path, String carbonTablePath,
+                                                      String segmentNo) throws IOException {
+    if (!path.isDirectory()) {
+      // copy data to trash
+      copyDataToTrashFolder(carbonTablePath, path.getAbsolutePath(), segmentNo);
+      return;
+    }
+    CarbonFile[] files = path.listFiles();
+    for (int i = 0; i < files.length; i++) {
+      copyDataRecursivelyToTrashFolder(files[i], carbonTablePath, segmentNo);
+    }
+    return;
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME;
+    // if the trash folder exists delete the contents of the trash folder, if it does not exists
+    // create a trash folder
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+      for (CarbonFile carbonFile : carbonFileList) {
+        deleteDataFromTrashFolderByFile(carbonFile);
+      }
+    } else {
+      // Create the new index server temp directory if it does not exist

Review comment:
       this else block is not required, please refer comment above in `copyDataToTrashFolder` method.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, copy that file to the trash
+                  // folder.
+                  if (eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) &&
+                          oneLoad.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS) {
+                    TrashUtil.copyDataToTrashFolder(carbonTable.getTablePath(),
+                            eachFile.getAbsolutePath(), eachFile.getAbsolutePath()
+                                    .substring(carbonTable.getTablePath().length() + 1,
+                                            eachFile.getAbsolutePath().length()));

Review comment:
       ```suggestion
                                               ));
   ```
   
   no need to add length

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy,
+                                           String suffixToAdd) throws IOException {
+    String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {
+          LOGGER.info("Creating Trash folder at:" + trashFolderPath);
+          FileFactory.createDirectoryAndSetPermission(trashFolderPath,
+                  new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        }
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy),
+                new File(trashFolderPath));
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder");
+    }
+  }
+
+  public static void copyDataRecursivelyToTrashFolder(CarbonFile path, String carbonTablePath,
+                                                      String segmentNo) throws IOException {
+    if (!path.isDirectory()) {
+      // copy data to trash
+      copyDataToTrashFolder(carbonTablePath, path.getAbsolutePath(), segmentNo);
+      return;
+    }
+    CarbonFile[] files = path.listFiles();

Review comment:
       do not do list files, it can very slow, especially in cloud. If its the directory, please use `copyDirectoryToDirectory` API, and you can remove `copyDataRecursivelyToTrashFolder` this method itself.

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {

Review comment:
       do not hardcode as 5, if you want to use default, please use constants defined.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>

Review comment:
       move this line above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName

Review comment:
       reformat the code, just check all methods.

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+

Review comment:
       remove this line

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier absoluteTableIdentifier) {
+    ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + oneLoad.getLoadName() + "It " +

Review comment:
       ```suggestion
           LOGGER.info("Info: Acquired segment lock on segment: " + oneLoad.getLoadName() + ". It " +
   ```

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file

Review comment:
       remove comment, not necessary

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.

Review comment:
       why this comment?

##########
File path: core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String pathOfFileToCopy,
+                                           String suffixToAdd) throws IOException {
+    String trashFolderPath = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {
+          LOGGER.info("Creating Trash folder at:" + trashFolderPath);
+          FileFactory.createDirectoryAndSetPermission(trashFolderPath,
+                  new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        }
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy),
+                new File(trashFolderPath));
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash folder");
+    }
+  }
+
+  public static void copyDataRecursivelyToTrashFolder(CarbonFile path, String carbonTablePath,
+                                                      String segmentNo) throws IOException {
+    if (!path.isDirectory()) {
+      // copy data to trash
+      copyDataToTrashFolder(carbonTablePath, path.getAbsolutePath(), segmentNo);
+      return;
+    }
+    CarbonFile[] files = path.listFiles();
+    for (int i = 0; i < files.length; i++) {
+      copyDataRecursivelyToTrashFolder(files[i], carbonTablePath, segmentNo);
+    }
+    return;
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = carbonTablePath + CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME;
+    // if the trash folder exists delete the contents of the trash folder, if it does not exists
+    // create a trash folder
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      List<CarbonFile> carbonFileList = FileFactory.getFolderList(pathOfTrashFolder);
+      for (CarbonFile carbonFile : carbonFileList) {
+        deleteDataFromTrashFolderByFile(carbonFile);
+      }
+    } else {
+      // Create the new index server temp directory if it does not exist
+      LOGGER.info("Creating Trash folder at:" + pathOfTrashFolder);
+      FileFactory.createDirectoryAndSetPermission(pathOfTrashFolder,
+              new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+    }
+  }
+
+  public static void deleteDataFromTrashFolderByFile(CarbonFile carbonFile) {
+    try {
+      FileFactory.deleteAllCarbonFilesOfDir(carbonFile);
+      LOGGER.info("delete file from trash+ " + carbonFile.getPath());
+    } catch (Exception e) {

Review comment:
       if you can see, `deleteAllCarbonFilesOfDir` throws, `CarbonFileException`, so replace with that, check again. Also may be include in the `deleteAllDataFromTrashFolder` method itself

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment

Review comment:
       ```suggestion
      * The method deletes all data if forceTableCLean <true> and clean garbage segment
   ```

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")

Review comment:
       move this to above line, remove unnecessary brackets around `tableUniqueName`

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()

Review comment:
       create scala mutable lists only, do not convert at last

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)

Review comment:
       why you need load model here?

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>

Review comment:
       ```suggestion
      * (MARKED_FOR_DELETE state) if forceTableClean <false>
   ```

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>

Review comment:
       ```suggestion
       carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loadDetail =>
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -48,13 +54,32 @@ import org.apache.carbondata.view.MVManagerInSpark
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
+    options: Option[List[(String, String)]],
     forceTableClean: Boolean = false,
     isInternalCleanCall: Boolean = false,
     truncateTable: Boolean = false)
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
+  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
+  var isDryRun: Boolean = false
+  if (optionsMap.contains("isdryrun") ) {
+    isDryRun = Boolean.parseBoolean(optionsMap.get("isdryrun").get.toString)

Review comment:
       ```suggestion
       isDryRun = Boolean.parseBoolean(optionsMap("isdryrun").toString)
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
##########
@@ -90,7 +90,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
     // Delete stale segment folders that are not in table status but are physically present in
     // the Fact folder
     LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    // TableProcessingOperations.deletePartialLoadDataIfExist(table, false)

Review comment:
       same as above,

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -48,13 +54,32 @@ import org.apache.carbondata.view.MVManagerInSpark
 case class CarbonCleanFilesCommand(
     databaseNameOp: Option[String],
     tableName: Option[String],
+    options: Option[List[(String, String)]],
     forceTableClean: Boolean = false,
     isInternalCleanCall: Boolean = false,
     truncateTable: Boolean = false)
   extends AtomicRunnableCommand {
 
   var carbonTable: CarbonTable = _
   var cleanFileCommands: List[CarbonCleanFilesCommand] = List.empty
+  val optionsMap = options.getOrElse(List.empty[(String, String)]).toMap
+  var isDryRun: Boolean = false
+  if (optionsMap.contains("isdryrun") ) {

Review comment:
       handle for case sensitivity

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
##########
@@ -108,7 +108,7 @@ case class CarbonLoadDataCommand(databaseNameOp: Option[String],
     // Delete stale segment folders that are not in table status but are physically present in
     // the Fact folder
     LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    // TableProcessingOperations.deletePartialLoadDataIfExist(table, false)

Review comment:
       same, if avoiding clean in load, please remove instead of commenting

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -70,50 +95,104 @@ case class CarbonCleanFilesCommand(
           val relationIdentifier = schema.getIdentifier
           CarbonCleanFilesCommand(
             Some(relationIdentifier.getDatabaseName),
-            Some(relationIdentifier.getTableName),
+            Some(relationIdentifier.getTableName), options,

Review comment:
       add to next line

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
##########
@@ -187,7 +187,7 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
     // Delete stale segment folders that are not in table status but are physically present in
     // the Fact folder
     LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    // TableProcessingOperations.deletePartialLoadDataIfExist(table, false)

Review comment:
       why commented

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
##########
@@ -70,50 +95,104 @@ case class CarbonCleanFilesCommand(
           val relationIdentifier = schema.getIdentifier
           CarbonCleanFilesCommand(
             Some(relationIdentifier.getDatabaseName),
-            Some(relationIdentifier.getTableName),
+            Some(relationIdentifier.getTableName), options,
             isInternalCleanCall = true)
       }.toList
       commands.foreach(_.processMetadata(sparkSession))
       cleanFileCommands = cleanFileCommands ++ commands
     }
-    Seq.empty
+      Seq.empty

Review comment:
       revert this

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {

Review comment:
       give method description

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableCLean <true> and lean garbage segment
+   * (MARKED_FOR_DELETE state) if forceTableCLean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of force clean
+   * @param forceTableClean        : <true> for force clean it will delete all data
+   *                               <false> it will clean garbage segment (MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+                  dbName: String,
+                  tableName: String,
+                  tablePath: String,
+                  carbonTable: CarbonTable,
+                  forceTableClean: Boolean,
+                  currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+                  truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files " +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+                                         partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if (metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if (!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+                                       timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach {
+      carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, timestamp)) {
+          // delete the file
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation +
+                CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " +
+                      s"${tableUniqueName}")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        // Create folders and files.
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = {
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+          .error("Failed to clear trash folder of all tables", e)
+    }
+  }
+
+
+  def cleanFilesDryRunOp(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // dry run for clean files command
+    // Clean files will remove compacted, Marked_for_delete, Insert in progress(stale) segments.
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(carbonTable.getDatabaseName)
+    carbonLoadModel.setTableName(carbonTable.getTableName)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(carbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+      } else {
+        throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+          carbonTable.getTableName, "table status read", "clean files command dry run")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val finalSegments = new java.util.ArrayList[Row]()
+
+    carbonLoadModel.getLoadMetadataDetails.asScala.toList.map(loaddetails =>
+      if (loaddetails.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE
+        || loaddetails.getSegmentStatus == SegmentStatus.COMPACTED) {
+        finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+          loaddetails.getSegmentStatus.toString, "DELETE"))
+      } else if (loaddetails.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        // try getting segment lock, if the lock is available, then it is a stale segment
+        // and can be moved to trash
+        if (CarbonUtil.tryGettingSegmentLock(loaddetails, carbonTable.getAbsoluteTableIdentifier)) {
+          finalSegments.add(Row(carbonTable.getTableName, loaddetails.getLoadName,
+            loaddetails.getSegmentStatus.toString, "MOVE TO TRASH"))
+        }
+      })
+    finalSegments.asScala ++= dryRunStaleSegments(carbonTable, sparkSession)
+    finalSegments.asScala
+  }
+
+  def cleanFilesDryRun(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    var res = cleanFilesDryRunOp(carbonTable, sparkSession)
+    // check for index tables
+    val indexTables = CarbonIndexUtil
+      .getIndexCarbonTables(carbonTable, sparkSession)
+    indexTables.foreach {
+      indexTable =>
+        res ++= cleanFilesDryRunOp(indexTable, sparkSession)
+    }
+  res
+  }
+
+  def dryRunStaleSegments(carbonTable: CarbonTable, sparkSession: SparkSession): Seq[Row] = {
+    // stale segments are those segments whose entry is not present in the table status file, but
+    // those segments are in Fact folder and metadata folder
+
+    val metaDataLocation = carbonTable.getMetadataPath
+    val details = SegmentStatusManager.readLoadMetadata(metaDataLocation)
+    val finalSegments = new java.util.ArrayList[Row]()
+    val partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath)
+    if (FileFactory.isFileExist(partitionPath)) {
+      val allSegments = FileFactory.getCarbonFile(partitionPath).listFiles
+      // there is no segment
+      if (allSegments == null || allSegments.length == 0) Seq.empty
+      // there is no segment or failed to read tablestatus file.
+      if (details == null || details.length == 0) Seq.empty
+      val staleSegments = TableProcessingOperations.getStaleSegments(details, allSegments, false)
+      staleSegments.asScala.foreach(
+        x => finalSegments.add(Row(carbonTable.getTableName, x._2,

Review comment:
       move variable x above, and rename to `staleSegment`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
##########
@@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataComman
       throw new MalformedCarbonCommandException(
         "Unsupported truncate table with specified partition")
     }
+    val optionList = List.empty[(String, String)]

Review comment:
       remove this

##########
File path: processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -152,6 +123,41 @@ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
     }
   }
 
+  public static HashMap<CarbonFile, String> getStaleSegments(LoadMetadataDetails[] details,

Review comment:
       reformat the code, please follow formatting in all the changes

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
##########
@@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: TruncateTableCommand) extends DataComman
       throw new MalformedCarbonCommandException(
         "Unsupported truncate table with specified partition")
     }
+    val optionList = List.empty[(String, String)]
+
     CarbonCleanFilesCommand(
       databaseNameOp = Option(dbName),
-      tableName = Option(tableName),
+      tableName = Option(tableName), Option(optionList),

Review comment:
       replace with` Option(List.empty)`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org