You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by qi...@apache.org on 2020/11/03 09:02:53 UTC

[carbondata] branch master updated: [CARBONDATA-4044] Fix dirty data in index file while IUD with stale data in segment folder

This is an automated email from the ASF dual-hosted git repository.

qiangcai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new fd2cc2a  [CARBONDATA-4044] Fix dirty data in index file while IUD with stale data in segment folder
fd2cc2a is described below

commit fd2cc2a79dc62cd78b3ea7ac363858472ad78ac1
Author: haomarch <ma...@126.com>
AuthorDate: Tue Oct 13 23:10:58 2020 +0800

    [CARBONDATA-4044] Fix dirty data in index file while IUD with stale data in segment folder
    
    Why is this PR needed?
    XX.mergecarbonindex and XX..segment records the indexfiles list of a segment. now, we generate xx.mergeindexfile and xx.segment  based on filter out all indexfiles(including carbonindex and mergecarbonindex), which will leading dirty data when there is stale data in segment folder.
    For example, there are a stale index file in segment_0 folder, "0_1603763776.carbonindex".
    While loading, a new carbonindex "0_16037752342.carbonindex" is wrote, when merge carbonindex files, we expect to only merge 0_16037752342.carbonindex, But If we filter out all carbonindex in segment folder, both "0_1603763776.carbonindex" and 0_16037752342.carbonindex will be merged and recorded into segment file.
    While updating, there has same problem.
    
    What changes were proposed in this PR?
    1. IUD: merge file based on UUID(timestamp).
    2. IUD: write segment file based on UUID(timestamp).
    3. Update: update will generate a new segment to avoid rewrite segment file. also will rollback the new segment when update fails
    4. We delete horizotal compaction processing of update delta(both carbondata files and carbonindex).
    5. Clean the dead code when update write into new segment
    6. Fix an update issue: The wrong update result after drop partition.
    
    Does this PR introduce any user interface change?
    Yes
    
    Is any new testcase added?
    Yes
    
    This closes #3999
---
 .../core/constants/CarbonCommonConstants.java      |  18 --
 .../blockletindex/SegmentIndexFileStore.java       |  44 ++-
 .../carbondata/core/metadata/SegmentFileStore.java |  56 ++--
 .../carbondata/core/mutate/CarbonUpdateUtil.java   | 148 ++--------
 .../core/segmentmeta/SegmentMetaDataInfoStats.java |  29 +-
 .../statusmanager/SegmentUpdateStatusManager.java  | 213 --------------
 .../carbondata/core/util/CarbonProperties.java     |  30 --
 .../apache/carbondata/core/util/CarbonUtil.java    |   4 +-
 .../core/writer/CarbonIndexFileMergeWriter.java    |  27 +-
 docs/configuration-parameters.md                   |   3 +-
 .../hadoop/api/CarbonOutputCommitter.java          |   5 +-
 .../cluster/sdv/generated/AlterTableTestCase.scala |   3 -
 .../spark/rdd/CarbonDataRDDFactory.scala           | 317 +--------------------
 .../carbondata/spark/rdd/CarbonIUDMergerRDD.scala  | 122 --------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala     |  35 +--
 .../spark/rdd/CarbonTableCompactor.scala           |  66 ++---
 .../org/apache/carbondata/view/MVRefresher.scala   |   7 +-
 .../command/carbonTableSchemaCommon.scala          |   2 +-
 .../CarbonAlterTableCompactionCommand.scala        |  48 +---
 .../management/CarbonInsertIntoCommand.scala       |  12 +-
 .../management/CarbonInsertIntoWithDf.scala        |  13 +-
 .../command/management/CommonLoadUtils.scala       |  40 +--
 .../mutation/CarbonProjectForDeleteCommand.scala   |  14 +-
 .../mutation/CarbonProjectForUpdateCommand.scala   |  50 ++--
 .../command/mutation/DeleteExecution.scala         |   1 +
 .../command/mutation/HorizontalCompaction.scala    |  69 +----
 .../mutation/merge/CarbonMergeDataSetCommand.scala |  31 +-
 .../secondaryindex/util/SecondaryIndexUtil.scala   |   1 -
 .../testsuite/dataload/TestLoadDataGeneral.scala   |   6 +-
 .../TestLoadDataWithStaleDataInSegmentFolder.scala | 234 +++++++++++++++
 .../allqueries/TestPruneUsingSegmentMinMax.scala   |   2 +-
 .../TestAlterTableSortColumnsProperty.scala        |   6 +-
 .../testsuite/iud/DeleteCarbonTableTestCase.scala  |  49 ++++
 .../iud/HorizontalCompactionTestCase.scala         |   4 +-
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 139 ++++++++-
 .../spark/testsuite/merge/MergeTestCase.scala      |  11 +
 .../rewrite/MVIncrementalLoadingTestcase.scala     |   2 +
 .../processing/merger/AbstractResultProcessor.java |  17 +-
 .../processing/merger/CarbonDataMergerUtil.java    | 270 +-----------------
 .../processing/util/CarbonLoaderUtil.java          |  19 +-
 40 files changed, 685 insertions(+), 1482 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index beaed60..14ad059 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -809,19 +809,6 @@ public final class CarbonCommonConstants {
   public static final int NUMBER_OF_SEGMENT_COMPACTED_PERTIME_UPPER_LIMIT = 10000;
 
   /**
-   * Number of Update Delta files which is the Threshold for IUD compaction.
-   * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
-   */
-  @CarbonProperty
-  public static final String UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION =
-      "carbon.horizontal.update.compaction.threshold";
-
-  /**
-   * Default count of segments which act as a threshold for IUD compaction merge.
-   */
-  public static final String DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION = "1";
-
-  /**
    * Number of Delete Delta files which is the Threshold for IUD compaction.
    * Only accepted Range is 0 - 10000. Outside this range system will pick default value.
    */
@@ -1776,11 +1763,6 @@ public final class CarbonCommonConstants {
   public static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
   /**
-   * UPDATE_DELTA_FILE_EXT
-   */
-  public static final String UPDATE_DELTA_FILE_EXT = FACT_FILE_EXT;
-
-  /**
    * MERGERD_EXTENSION
    */
   public static final String MERGERD_EXTENSION = ".merge";
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 39b6048..a82ba8e 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -98,11 +98,12 @@ public class SegmentIndexFileStore {
   /**
    * Read all index files and keep the cache in it.
    *
-   * @param segmentPath
+   * @param segmentPath the getAbsolutePath of segment folder
+   * @param uuid the loadstarttime of segment
    * @throws IOException
    */
-  public void readAllIIndexOfSegment(String segmentPath) throws IOException {
-    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration);
+  public void readAllIIndexOfSegment(String segmentPath, String uuid) throws IOException {
+    CarbonFile[] carbonIndexFiles = getCarbonIndexFiles(segmentPath, configuration, uuid);
     for (CarbonFile carbonIndexFile : carbonIndexFiles) {
       if (carbonIndexFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFile.getCanonicalPath());
@@ -115,6 +116,16 @@ public class SegmentIndexFileStore {
   /**
    * Read all index files and keep the cache in it.
    *
+   * @param segmentPath
+   * @throws IOException
+   */
+  public void readAllIIndexOfSegment(String segmentPath) throws IOException {
+    readAllIIndexOfSegment(segmentPath, null);
+  }
+
+  /**
+   * Read all index files and keep the cache in it.
+   *
    * @param segmentFile
    * @throws IOException
    */
@@ -167,9 +178,9 @@ public class SegmentIndexFileStore {
    * @param segmentPath
    * @throws IOException
    */
-  public void readAllIndexAndFillBlockletInfo(String segmentPath) throws IOException {
+  public void readAllIndexAndFillBlockletInfo(String segmentPath, String uuid) throws IOException {
     CarbonFile[] carbonIndexFiles =
-        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
+        getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
     for (CarbonFile carbonIndexFile : carbonIndexFiles) {
       if (carbonIndexFile.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
         readMergeFile(carbonIndexFile.getCanonicalPath());
@@ -363,8 +374,19 @@ public class SegmentIndexFileStore {
    * @param segmentPath
    * @return
    */
-  public static CarbonFile[] getCarbonIndexFiles(String segmentPath, Configuration configuration) {
+  public static CarbonFile[] getCarbonIndexFiles(String segmentPath,
+      Configuration configuration, String uuid) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, configuration);
+    if (null != uuid) {
+      return carbonFile.listFiles(new CarbonFileFilter() {
+        @Override
+        public boolean accept(CarbonFile file) {
+          return file.getName().contains(uuid)
+              && (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+              .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0;
+        }
+      });
+    }
     return carbonFile.listFiles(new CarbonFileFilter() {
       @Override
       public boolean accept(CarbonFile file) {
@@ -377,6 +399,16 @@ public class SegmentIndexFileStore {
   /**
    * List all the index files of the segment.
    *
+   * @param segmentPath
+   * @return
+   */
+  public static CarbonFile[] getCarbonIndexFiles(String segmentPath, Configuration configuration) {
+    return getCarbonIndexFiles(segmentPath, configuration, null);
+  }
+
+  /**
+   * List all the index files of the segment.
+   *
    * @param carbonFiles
    * @return
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 612308b..8fc1da8 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -232,43 +232,19 @@ public class SegmentFileStore {
     return segmentId + "_" + UUID;
   }
 
-  /**
-   * Write segment file to the metadata folder of the table
-   *
-   * @param carbonTable CarbonTable
-   * @param segmentId segment id
-   * @param UUID      a UUID string used to construct the segment file name
-   * @param segmentMetaDataInfo list of block level min and max values for segment
-   * @return segment file name
-   */
-  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String uuid,
       SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, segmentMetaDataInfo);
+    return writeSegmentFile(carbonTable, segmentId, uuid, null, segmentMetaDataInfo);
   }
 
-  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID)
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String uuid)
       throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, null);
-  }
-
-  /**
-   * Write segment file to the metadata folder of the table
-   *
-   * @param carbonTable CarbonTable
-   * @param segmentId segment id
-   * @param UUID      a UUID string used to construct the segment file name
-   * @param segPath segment path
-   * @param segmentMetaDataInfo segment metadata info
-   * @return segment file name
-   */
-  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
-      String segPath, SegmentMetaDataInfo segmentMetaDataInfo) throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, segmentMetaDataInfo);
+    return writeSegmentFile(carbonTable, segmentId, uuid, null, null);
   }
 
-  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String uuid,
       String segPath) throws IOException {
-    return writeSegmentFile(carbonTable, segmentId, UUID, null, segPath, null);
+    return writeSegmentFile(carbonTable, segmentId, uuid, segPath, null);
   }
 
   /**
@@ -393,13 +369,12 @@ public class SegmentFileStore {
    *
    * @param carbonTable
    * @param segmentId
-   * @param UUID
-   * @param currentLoadTimeStamp
+   * @param uuid
    * @return
    * @throws IOException
    */
-  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String UUID,
-      final String currentLoadTimeStamp, String absSegPath, SegmentMetaDataInfo segmentMetaDataInfo)
+  public static String writeSegmentFile(CarbonTable carbonTable, String segmentId, String uuid,
+        String absSegPath, SegmentMetaDataInfo segmentMetaDataInfo)
       throws IOException {
     String tablePath = carbonTable.getTablePath();
     boolean supportFlatFolder = carbonTable.isSupportFlatFolder();
@@ -411,13 +386,13 @@ public class SegmentFileStore {
     CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
       @Override
       public boolean accept(CarbonFile file) {
-        if (null != currentLoadTimeStamp) {
-          return file.getName().contains(currentLoadTimeStamp) && (
+        if (null != uuid) {
+          return file.getName().contains(uuid) && (
               file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
                   .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
         }
-        return (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT);
       }
     });
     if (indexFiles != null && indexFiles.length > 0) {
@@ -450,7 +425,7 @@ public class SegmentFileStore {
       if (!carbonFile.exists()) {
         carbonFile.mkdirs();
       }
-      String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
+      String segmentFileName = genSegmentFileName(segmentId, uuid) + CarbonTablePath.SEGMENT_EXT;
       // write segment info to new file.
       writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName);
 
@@ -1019,7 +994,8 @@ public class SegmentFileStore {
       Set<Segment> segmentSet = new HashSet<>(
           new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier())
               .getValidAndInvalidSegments(carbonTable.isMV()).getValidSegments());
-      CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
+      CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId,
+          true, false,
           Segment.toSegmentList(toBeDeleteSegments, null),
           Segment.toSegmentList(toBeUpdatedSegments, null), uuid);
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 5dce0f6..eb022a1 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -238,9 +238,10 @@ public class CarbonUpdateUtil {
    */
   public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
       CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
-      List<Segment> segmentsToBeDeleted) {
+      boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted) {
     return updateTableMetadataStatus(updatedSegmentsList, table, updatedTimeStamp,
-        isTimestampUpdateRequired, segmentsToBeDeleted, new ArrayList<Segment>(), "");
+        isTimestampUpdateRequired, isUpdateStatusFileUpdateRequired,
+        segmentsToBeDeleted, new ArrayList<Segment>(), "");
   }
 
   /**
@@ -254,7 +255,8 @@ public class CarbonUpdateUtil {
    */
   public static boolean updateTableMetadataStatus(Set<Segment> updatedSegmentsList,
       CarbonTable table, String updatedTimeStamp, boolean isTimestampUpdateRequired,
-      List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated, String uuid) {
+      boolean isUpdateStatusFileUpdateRequired, List<Segment> segmentsToBeDeleted,
+      List<Segment> segmentFilesTobeUpdated, String uuid) {
 
     boolean status = false;
     String metaDataFilepath = table.getMetadataPath();
@@ -268,22 +270,21 @@ public class CarbonUpdateUtil {
     try {
       lockStatus = carbonLock.lockWithRetries();
       if (lockStatus) {
-        LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
-                        + " for table status update");
+        LOGGER.info("Acquired lock for table" + table.getDatabaseName() + "." + table.getTableName()
+             + " for table status update");
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+          // we are storing the link between the 2 status files in the segment 0 only.
+          if (isUpdateStatusFileUpdateRequired &&
+              loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+            loadMetadata.setUpdateStatusFileName(
+                CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
+          }
 
           if (isTimestampUpdateRequired) {
-            // we are storing the link between the 2 status files in the segment 0 only.
-            if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
-              loadMetadata.setUpdateStatusFileName(
-                      CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp));
-            }
-
             // if the segments is in the list of marked for delete then update the status.
             if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
               loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
@@ -373,9 +374,7 @@ public class CarbonUpdateUtil {
         @Override
         public boolean accept(CarbonFile file) {
           String fileName = file.getName();
-          return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
-                  || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT));
+          return fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT);
         }
       });
       // deleting the files of a segment.
@@ -511,8 +510,6 @@ public class CarbonUpdateUtil {
 
     boolean isInvalidFile = false;
 
-    List<Segment> segmentFilesToBeUpdated = new ArrayList<>();
-
     // take the update status file name from 0th segment.
     validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
     // scan through each segment.
@@ -532,38 +529,6 @@ public class CarbonUpdateUtil {
               FileFactory.getCarbonFile(segmentPath);
           CarbonFile[] allSegmentFiles = segDir.listFiles();
 
-          // scan through the segment and find the carbon data files and index files.
-          boolean updateSegmentFile = false;
-          // deleting of the aborted file scenario.
-          if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) {
-            updateSegmentFile = true;
-          }
-
-          // get Invalid update  delta files.
-          CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-              .getUpdateDeltaFilesList(segment, false,
-                  CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
-                  isInvalidFile);
-
-          // now for each invalid delta file need to check the query execution time out
-          // and then delete.
-          for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-            compareTimestampsAndDelete(invalidFile, forceDelete, false);
-          }
-          // do the same for the index files.
-          CarbonFile[] invalidIndexFiles = updateStatusManager
-              .getUpdateDeltaFilesList(segment, false,
-                  CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
-                  isInvalidFile);
-
-          // now for each invalid index file need to check the query execution time out
-          // and then delete.
-
-          for (CarbonFile invalidFile : invalidIndexFiles) {
-            if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
-              updateSegmentFile = true;
-            }
-          }
           // now handle all the delete delta files which needs to be deleted.
           // there are 2 cases here .
           // 1. if the block is marked as compacted then the corresponding delta files
@@ -607,56 +572,12 @@ public class CarbonUpdateUtil {
               }
             }
           }
-          if (updateSegmentFile) {
-            segmentFilesToBeUpdated.add(
-                new Segment(segment.getLoadName(), segment.getSegmentFile(), null));
-          }
         }
         // handle cleanup of merge index files and data files after small files merge happened for
         // SI table
         cleanUpDataFilesAfterSmallFilesMergeForSI(table, segment);
       }
     }
-    String UUID = String.valueOf(System.currentTimeMillis());
-    List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
-    CarbonFile segmentFilesLocation =
-        FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
-    Set<String> segmentFilesNotToDelete = new HashSet<>();
-    Set<String> updatedSegmentIDs = new HashSet<>(Arrays.asList(
-        segmentFilesToBeUpdated.stream().map(Segment::getSegmentNo).toArray(String[]::new)));
-    for (Segment segment : segmentFilesToBeUpdated) {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName());
-      segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
-      String updatedSegmentFile = SegmentFileStore
-          .writeSegmentFile(table, segment.getSegmentNo(), UUID,
-              CarbonTablePath.getSegmentPath(table.getTablePath(), segment.getSegmentNo()),
-              segment.getSegmentMetaDataInfo());
-      segmentFilesNotToDelete.add(updatedSegmentFile);
-      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), updatedSegmentFile));
-    }
-    if (segmentFilesNotToDelete.size() > 0) {
-      // delete the old segment files
-      CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          return !segmentFilesNotToDelete.contains(file.getName()) && updatedSegmentIDs
-              .contains(CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(file.getName()));
-        }
-      });
-      for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
-        invalidSegmentFile.delete();
-      }
-    }
-    if (segmentFilesToBeUpdated.size() > 0) {
-      updateTableMetadataStatus(
-          new HashSet<Segment>(segmentFilesToBeUpdated),
-          table,
-          UUID,
-          false,
-          new ArrayList<Segment>(),
-          segmentFilesToBeUpdatedLatest, "");
-    }
 
     // delete the update table status files which are old.
     if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
@@ -672,7 +593,6 @@ public class CarbonUpdateUtil {
         @Override
         public boolean accept(CarbonFile file) {
           if (file.getName().startsWith(CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME)) {
-
             // CHECK if this is valid or not.
             // we only send invalid ones to delete.
             return !file.getName().endsWith(updateStatusTimestamp);
@@ -682,7 +602,6 @@ public class CarbonUpdateUtil {
       });
 
       for (CarbonFile invalidFile : invalidUpdateStatusFiles) {
-
         compareTimestampsAndDelete(invalidFile, forceDelete, true);
       }
     }
@@ -691,7 +610,7 @@ public class CarbonUpdateUtil {
   /**
    * this is the clean up added specifically for SI table, because after we merge the data files
    * inside the secondary index table, we need to delete the stale carbondata files.
-   * refer {@link org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD}
+   * refer org.apache.spark.sql.secondaryindex.rdd.CarbonSIRebuildRDD
    */
   private static void cleanUpDataFilesAfterSmallFilesMergeForSI(CarbonTable table,
       LoadMetadataDetails segment) throws IOException {
@@ -732,41 +651,6 @@ public class CarbonUpdateUtil {
     }
   }
 
-  /**
-   * This function deletes all the stale carbondata files during clean up before update operation
-   * one scenario is if update operation is abruptly stopped before update of table status then
-   * the carbondata file created during update operation is stale file and it will be deleted in
-   * this function in next update operation
-   * @param segment
-   * @param allSegmentFiles
-   * @param updateStatusManager
-   */
-  private static boolean deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
-      CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
-    CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
-        .getUpdateDeltaFilesList(segment, false,
-            CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles,
-            true);
-    // now for each invalid delta file need to check the query execution time out
-    // and then delete.
-    for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-      compareTimestampsAndDelete(invalidFile, true, false);
-    }
-    // do the same for the index files.
-    CarbonFile[] invalidIndexFiles = updateStatusManager
-        .getUpdateDeltaFilesList(segment, false,
-            CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles,
-            true);
-    // now for each invalid index file need to check the query execution time out
-    // and then delete.
-    boolean updateSegmentFile = false;
-    for (CarbonFile invalidFile : invalidIndexFiles) {
-      if (compareTimestampsAndDelete(invalidFile, true, false)) {
-        updateSegmentFile = true;
-      }
-    }
-    return updateSegmentFile;
-  }
 
   /**
    * This will tell whether the max query timeout has been expired or not.
diff --git a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
index 71828c8..4f11eb0 100644
--- a/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
+++ b/core/src/main/java/org/apache/carbondata/core/segmentmeta/SegmentMetaDataInfoStats.java
@@ -22,7 +22,6 @@ import java.util.LinkedHashMap;
 import java.util.Map;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.indexstore.blockletindex.BlockIndex;
 import org.apache.carbondata.core.util.ByteUtil;
 
 /**
@@ -86,28 +85,14 @@ public class SegmentMetaDataInfoStats {
   public synchronized void setBlockMetaDataInfo(String tableName, String segmentId,
       BlockColumnMetaDataInfo currentBlockColumnMetaInfo) {
     // check if tableName is present in tableSegmentMetaDataInfoMap
-    if (!this.tableSegmentMetaDataInfoMap.isEmpty() && null != this.tableSegmentMetaDataInfoMap
-        .get(tableName) && null != this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)) {
-      // get previous blockColumn metadata information
-      BlockColumnMetaDataInfo previousBlockColumnMetaInfo =
-          this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId);
-      // compare and get updated min and max values
-      byte[][] updatedMin = BlockIndex.compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMin(),
-          currentBlockColumnMetaInfo.getMin(), true);
-      byte[][] updatedMax = BlockIndex.compareAndUpdateMinMax(previousBlockColumnMetaInfo.getMax(),
-          currentBlockColumnMetaInfo.getMax(), false);
-      // update the segment
-      this.tableSegmentMetaDataInfoMap.get(tableName).get(segmentId)
-          .setMinMax(updatedMin, updatedMax);
-    } else {
-      Map<String, BlockColumnMetaDataInfo> segmentMinMaxMap = new HashMap<>();
-      if (null != this.tableSegmentMetaDataInfoMap.get(tableName)
-          && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
-        segmentMinMaxMap = this.tableSegmentMetaDataInfoMap.get(tableName);
-      }
-      segmentMinMaxMap.put(segmentId, currentBlockColumnMetaInfo);
-      this.tableSegmentMetaDataInfoMap.put(tableName, segmentMinMaxMap);
+    Map<String, BlockColumnMetaDataInfo> segmentMinMaxMap = new HashMap<>();
+    if (!this.tableSegmentMetaDataInfoMap.isEmpty()
+        && null != this.tableSegmentMetaDataInfoMap.get(tableName)
+        && !this.tableSegmentMetaDataInfoMap.get(tableName).isEmpty()) {
+      segmentMinMaxMap = this.tableSegmentMetaDataInfoMap.get(tableName);
     }
+    segmentMinMaxMap.put(segmentId, currentBlockColumnMetaInfo);
+    this.tableSegmentMetaDataInfoMap.put(tableName, segmentMinMaxMap);
   }
 
   /**
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 4bf8ba1..31e253b 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -213,64 +213,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Returns all update delta files of specified Segment.
-   *
-   * @param segmentId
-   * @return
-   * @throws Exception
-   */
-  public List<String> getUpdateDeltaFiles(final String segmentId) {
-    List<String> updatedDeltaFilesList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    String endTimeStamp = "";
-    String startTimeStamp = "";
-    String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), segmentId);
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath);
-    for (LoadMetadataDetails eachSeg : segmentDetails) {
-      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
-        // if the segment is found then take the start and end time stamp.
-        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
-        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
-      }
-    }
-    // if start timestamp is empty then no update delta is found. so return empty list.
-    if (startTimeStamp.isEmpty()) {
-      return updatedDeltaFilesList;
-    }
-    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
-    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
-
-    // else scan the segment for the delta files with the respective timestamp.
-    CarbonFile[] files = segDir.listFiles(new CarbonFileFilter() {
-
-      @Override
-      public boolean accept(CarbonFile pathName) {
-        String fileName = pathName.getName();
-        if (fileName.endsWith(CarbonCommonConstants.UPDATE_DELTA_FILE_EXT)) {
-          String firstPart = fileName.substring(0, fileName.indexOf('.'));
-
-          long timestamp = Long.parseLong(firstPart
-              .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
-          if (timestamp <= endTimeStampFinal && timestamp >= startTimeStampFinal) {
-
-            // if marked for delete then it is invalid.
-            return isBlockValid(segmentId, fileName);
-          }
-        }
-        return false;
-      }
-    });
-
-    for (CarbonFile file : files) {
-      updatedDeltaFilesList.add(file.getCanonicalPath());
-    }
-
-    return updatedDeltaFilesList;
-  }
-
-  /**
    * Below method will be used to get all the delete delta files based on block name
    *
    * @param blockFilePath actual block filePath
@@ -451,161 +393,6 @@ public class SegmentUpdateStatusManager {
   }
 
   /**
-   * Returns all update delta files of specified Segment.
-   *
-   * @param loadMetadataDetail metadata details of segment
-   * @param validUpdateFiles if true then only the valid range files will be returned.
-   * @return
-   */
-  public CarbonFile[] getUpdateDeltaFilesList(LoadMetadataDetails loadMetadataDetail,
-      final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
-      CarbonFile[] allFilesOfSegment, boolean isAbortedFile) {
-
-    String endTimeStamp = "";
-    String startTimeStamp = "";
-    long factTimeStamp = 0;
-
-    // if the segment is found then take the start and end time stamp.
-    startTimeStamp = loadMetadataDetail.getUpdateDeltaStartTimestamp();
-    endTimeStamp = loadMetadataDetail.getUpdateDeltaEndTimestamp();
-    factTimeStamp = loadMetadataDetail.getLoadStartTime();
-
-    // if start timestamp is empty then no update delta is found. so return empty list.
-    if (startTimeStamp.isEmpty()) {
-      return new CarbonFile[0];
-    }
-
-    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
-    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
-    final long factTimeStampFinal = factTimeStamp;
-
-    List<CarbonFile> listOfCarbonFiles =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // else scan the segment for the delta files with the respective timestamp.
-
-    for (CarbonFile eachFile : allFilesOfSegment) {
-
-      String fileName = eachFile.getName();
-      if (fileName.endsWith(fileExtension)) {
-        long timestamp =
-            Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(fileName));
-
-        if (excludeOriginalFact) {
-          if (factTimeStampFinal == timestamp) {
-            continue;
-          }
-        }
-
-        if (validUpdateFiles) {
-          if (timestamp <= endTimeStampFinal
-              && timestamp >= startTimeStampFinal) {
-            listOfCarbonFiles.add(eachFile);
-          }
-        } else {
-          // invalid cases.
-          if (isAbortedFile) {
-            if (timestamp > endTimeStampFinal) {
-              listOfCarbonFiles.add(eachFile);
-            }
-          } else if (timestamp < startTimeStampFinal
-              || timestamp > endTimeStampFinal) {
-            listOfCarbonFiles.add(eachFile);
-          }
-        }
-      }
-    }
-
-    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
-  }
-
-  /**
-   * Returns all update delta files of specified Segment.
-   *
-   * @param segmentId
-   * @param validUpdateFiles
-   * @param fileExtension
-   * @param excludeOriginalFact
-   * @param allFilesOfSegment
-   * @return
-   */
-  public CarbonFile[] getUpdateDeltaFilesForSegment(String segmentId,
-      final boolean validUpdateFiles, final String fileExtension, final boolean excludeOriginalFact,
-      CarbonFile[] allFilesOfSegment) {
-
-    String endTimeStamp = "";
-    String startTimeStamp = "";
-    long factTimeStamp = 0;
-
-    for (LoadMetadataDetails eachSeg : segmentDetails) {
-      if (eachSeg.getLoadName().equalsIgnoreCase(segmentId)) {
-        // if the segment is found then take the start and end time stamp.
-        startTimeStamp = eachSeg.getUpdateDeltaStartTimestamp();
-        endTimeStamp = eachSeg.getUpdateDeltaEndTimestamp();
-        factTimeStamp = eachSeg.getLoadStartTime();
-      }
-    }
-
-    // if start timestamp is empty then no update delta is found. so return empty list.
-    if (startTimeStamp.isEmpty()) {
-      return new CarbonFile[0];
-    }
-
-    final Long endTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(endTimeStamp);
-    final Long startTimeStampFinal = CarbonUpdateUtil.getTimeStampAsLong(startTimeStamp);
-    final long factTimeStampFinal = factTimeStamp;
-
-    List<CarbonFile> listOfCarbonFiles =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // else scan the segment for the delta files with the respective timestamp.
-
-    for (CarbonFile eachFile : allFilesOfSegment) {
-
-      String fileName = eachFile.getName();
-      if (fileName.endsWith(fileExtension)) {
-        String firstPart = fileName.substring(0, fileName.indexOf('.'));
-
-        long timestamp = Long.parseLong(firstPart
-            .substring(firstPart.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1));
-
-        if (excludeOriginalFact) {
-          if (factTimeStampFinal == timestamp) {
-            continue;
-          }
-        }
-
-        if (validUpdateFiles) {
-          if (timestamp <= endTimeStampFinal
-              && timestamp >= startTimeStampFinal) {
-
-            boolean validBlock = true;
-
-            for (SegmentUpdateDetails blockDetails : getUpdateStatusDetails()) {
-              if (blockDetails.getActualBlockName().equalsIgnoreCase(eachFile.getName())
-                  && CarbonUpdateUtil.isBlockInvalid(blockDetails.getSegmentStatus())) {
-                validBlock = false;
-              }
-            }
-
-            if (validBlock) {
-              listOfCarbonFiles.add(eachFile);
-            }
-
-          }
-        } else {
-          // invalid cases.
-          if (timestamp < startTimeStampFinal) {
-            listOfCarbonFiles.add(eachFile);
-          }
-        }
-      }
-    }
-
-    return listOfCarbonFiles.toArray(new CarbonFile[listOfCarbonFiles.size()]);
-  }
-
-  /**
    *
    * @param extension
    * @param block
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index fe61c89..b5f92d9 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1249,36 +1249,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * Returns configured update delta files value for IUD compaction
-   *
-   * @return numberOfDeltaFilesThreshold
-   */
-  public int getNoUpdateDeltaFilesThresholdForIUDCompaction() {
-    int numberOfDeltaFilesThreshold;
-    try {
-      numberOfDeltaFilesThreshold = Integer.parseInt(
-          getProperty(CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION,
-              CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION));
-
-      if (numberOfDeltaFilesThreshold < 0 || numberOfDeltaFilesThreshold > 10000) {
-        LOGGER.warn("The specified value for property "
-            + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION
-            + "is incorrect."
-            + " Correct value should be in range of 0 -10000. Taking the default value.");
-        numberOfDeltaFilesThreshold = Integer.parseInt(
-            CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
-      }
-    } catch (NumberFormatException e) {
-      LOGGER.warn("The specified value for property "
-          + CarbonCommonConstants.UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION + "is incorrect."
-          + " Correct value should be in range of 0 -10000. Taking the default value.");
-      numberOfDeltaFilesThreshold = Integer
-          .parseInt(CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION);
-    }
-    return numberOfDeltaFilesThreshold;
-  }
-
-  /**
    * Returns configured delete delta files value for IUD compaction
    *
    * @return numberOfDeltaFilesThreshold
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c5dc8c3..42d6512 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2618,8 +2618,8 @@ public final class CarbonUtil {
       // Storing the number of files written by each task.
       metrics.incrementCount();
       // Storing the files written by each task.
-      metrics.addToOutputFiles(targetPath + localFilePath
-          .substring(localFilePath.lastIndexOf(File.separator)) + ":" + targetSize);
+      metrics.addToOutputFiles(targetPath + CarbonCommonConstants.FILE_SEPARATOR + localFilePath
+          .substring(localFilePath.lastIndexOf(File.separator) + 1) + ":" + targetSize);
       metrics.addOutputBytes(targetSize);
     }
   }
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 6fa2681..3f26aa2 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -100,13 +100,13 @@ public class CarbonIndexFileMergeWriter {
           indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
         }
       } else {
-        indexFiles =
-            SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
+        indexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
       }
       if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
         if (sfs == null) {
           return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
-              readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
+              readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId, uuid);
         } else {
           return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
               indexFiles, uuid, partitionPath);
@@ -134,13 +134,14 @@ public class CarbonIndexFileMergeWriter {
       }
     }
     if (null != partitionPath && !partitionTempPath.isEmpty()) {
-      fileStore.readAllIIndexOfSegment(partitionTempPath);
+      fileStore.readAllIIndexOfSegment(partitionTempPath, uuid);
     }
     Map<String, Map<String, byte[]>> indexLocationMap =
         groupIndexesBySegment(fileStore.getCarbonIndexMapWithFullPath());
     SegmentFileStore.FolderDetails folderDetails = null;
     for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
-      String mergeIndexFile = writeMergeIndexFile(null, partitionPath, entry.getValue(), segmentId);
+      String mergeIndexFile =
+          writeMergeIndexFile(null, partitionPath, entry.getValue(), segmentId, uuid);
       folderDetails = new SegmentFileStore.FolderDetails();
       folderDetails.setMergeFileName(mergeIndexFile);
       folderDetails.setStatus("Success");
@@ -185,18 +186,18 @@ public class CarbonIndexFileMergeWriter {
 
   private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
       boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
-      String segmentId) throws IOException {
+      String segmentId, String uuid) throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
     if (readFileFooterFromCarbonDataFile) {
       // this case will be used in case of upgrade where old store will not have the blocklet
       // info in the index file and therefore blocklet info need to be read from the file footer
       // in the carbondata file
-      fileStore.readAllIndexAndFillBlockletInfo(segmentPath);
+      fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid);
     } else {
-      fileStore.readAllIIndexOfSegment(segmentPath);
+      fileStore.readAllIIndexOfSegment(segmentPath, uuid);
     }
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
-    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId);
+    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId, uuid);
     for (CarbonFile indexFile : indexFiles) {
       indexFile.delete();
     }
@@ -223,8 +224,8 @@ public class CarbonIndexFileMergeWriter {
             .readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath())));
     List<String> mergeIndexFiles = new ArrayList<>();
     for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
-      String mergeIndexFile =
-          writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue(), segmentId);
+      String mergeIndexFile = writeMergeIndexFile(indexFileNamesTobeAdded,
+          entry.getKey(), entry.getValue(), segmentId, uuid);
       for (Map.Entry<String, SegmentFileStore.FolderDetails> segment : segmentFileStore
           .getLocationMap().entrySet()) {
         String location = segment.getKey();
@@ -296,7 +297,7 @@ public class CarbonIndexFileMergeWriter {
   }
 
   private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
-      Map<String, byte[]> indexMap, String segment_id) throws IOException {
+      Map<String, byte[]> indexMap, String segment_id, String uuid) throws IOException {
     MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
     MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
     List<String> fileNames = new ArrayList<>(indexMap.size());
@@ -310,7 +311,7 @@ public class CarbonIndexFileMergeWriter {
     }
     if (fileNames.size() > 0) {
       String mergeIndexName =
-          segment_id + '_' + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT;
+          segment_id + '_' + uuid + CarbonTablePath.MERGE_INDEX_FILE_EXT;
       openThriftWriter(segmentPath + "/" + mergeIndexName);
       indexHeader.setFile_names(fileNames);
       mergedBlockIndex.setFileData(data);
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index f38983b..b79fd2b 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -104,8 +104,7 @@ This section provides the details of all the configurations required for the Car
 | carbon.number.of.cores.while.compacting | 2 | Number of cores to be used while compacting data. This also determines the number of threads to be used to read carbondata files in parallel. |
 | carbon.compaction.level.threshold | 4, 3 | Each CarbonData load will create one segment, if every load is small in size it will generate many small file over a period of time impacting the query performance. This configuration is for minor compaction which decides how many segments to be merged. Configuration is of the form (x,y). Compaction will be triggered for every x segments and form a single level 1 compacted segment. When the number of compacted level 1 segments reach y, compact [...]
 | carbon.major.compaction.size | 1024 | To improve query performance and all the segments can be merged and compacted to a single segment upto configured size. This Major compaction size can be configured using this parameter. Sum of the segments which is below this threshold will be merged. This value is expressed in MB. |
-| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delta (DELETE/ UPDATE) files be [...]
-| carbon.horizontal.update.compaction.threshold | 1 | This configuration specifies the threshold limit on number of UPDATE delta files within a segment. In case the number of delta files goes beyond the threshold, the UPDATE delta files within the segment becomes eligible for horizontal compaction and are compacted into single UPDATE delta file. Values range between 1 to 10000. |
+| carbon.horizontal.compaction.enable | true | CarbonData supports DELETE/UPDATE functionality by creating delta data files for existing carbondata files. These delta files would grow as more number of DELETE/UPDATE operations are performed. Compaction of these delete delta files are termed as horizontal compaction. This configuration is used to turn ON/OFF horizontal compaction. After every DELETE and UPDATE statement, horizontal compaction may occur in case the delete delta files becom [...]
 | carbon.horizontal.delete.compaction.threshold | 1 | This configuration specifies the threshold limit on number of DELETE delta files within a block of a segment. In case the number of delta files goes beyond the threshold, the DELETE delta files for the particular block of the segment becomes eligible for horizontal compaction and are compacted into single DELETE delta file. Values range between 1 to 10000. |
 | carbon.update.segment.parallelism | 1 | CarbonData processes the UPDATE operations by grouping records belonging to a segment into a single executor task. When the amount of data to be updated is more, this behavior causes problems like restarting of executor due to low memory and data-spill related errors. This property specifies the parallelism for each segment during update. **NOTE:** It is recommended to set this value to a multiple of the number of executors for balance. Values ra [...]
 | carbon.numberof.preserve.segments | 0 | If the user wants to preserve some number of segments from being compacted then he can set this configuration. Example: carbon.numberof.preserve.segments = 2 then 2 latest segments will always be excluded from the compaction. No segments will be preserved by default. **NOTE:** This configuration is useful when the chances of input data can be wrong due to environment scenarios. Preserving some of the latest segments from being compacted can help  [...]
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 0943002..6ab6adc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -231,9 +231,12 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     if (!segmentsToBeDeleted.trim().isEmpty()) {
       segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
     }
+    boolean isUpdateStatusFileUpdateRequired =
+        (context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP) != null);
     if (updateTime != null) {
       CarbonUpdateUtil.updateTableMetadataStatus(Collections.singleton(loadModel.getSegment()),
-          carbonTable, updateTime, true, segmentDeleteList);
+          carbonTable, updateTime, true,
+          isUpdateStatusFileUpdateRequired, segmentDeleteList);
     }
   }
 
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
index 598cb7d..c5989d9 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/AlterTableTestCase.scala
@@ -1074,7 +1074,6 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
 
   val prop = CarbonProperties.getInstance()
   val p1 = prop.getProperty("carbon.horizontal.compaction.enable", CarbonCommonConstants.CARBON_HORIZONTAL_COMPACTION_ENABLE_DEFAULT)
-  val p2 = prop.getProperty("carbon.horizontal.update.compaction.threshold", CarbonCommonConstants.DEFAULT_UPDATE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
   val p3 = prop.getProperty("carbon.horizontal.delete.compaction.threshold", CarbonCommonConstants.DEFAULT_DELETE_DELTAFILE_COUNT_THRESHOLD_IUD_COMPACTION)
   val p4 = prop.getProperty("carbon.compaction.level.threshold", CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   val p5 = prop.getProperty("carbon.enable.auto.load.merge", CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
@@ -1083,7 +1082,6 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
   override protected def beforeAll() {
     // Adding new properties
     prop.addProperty("carbon.horizontal.compaction.enable", "true")
-    prop.addProperty("carbon.horizontal.update.compaction.threshold", "1")
     prop.addProperty("carbon.horizontal.delete.compaction.threshold", "1")
     prop.addProperty("carbon.compaction.level.threshold", "2,1")
     prop.addProperty("carbon.enable.auto.load.merge", "false")
@@ -1093,7 +1091,6 @@ class AlterTableTestCase extends QueryTest with BeforeAndAfterAll {
   override def afterAll: Unit = {
     // Reverting to old
     prop.addProperty("carbon.horizontal.compaction.enable", p1)
-    prop.addProperty("carbon.horizontal.update.compaction.threshold", p2)
     prop.addProperty("carbon.horizontal.delete.compaction.threshold", p3)
     prop.addProperty("carbon.compaction.level.threshold", p4)
     prop.addProperty("carbon.enable.auto.load.merge", p5)
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 50163b1..0b34f81 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -149,11 +149,7 @@ object CarbonDataRDDFactory {
       operationContext: OperationContext): Unit = {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
-    if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
-      // update the updated table status. For the case of Update Delta Compaction the Metadata
-      // is filled in LoadModel, no need to refresh.
-      carbonLoadModel.readAndSetLoadMetadataDetails()
-    }
+    carbonLoadModel.readAndSetLoadMetadataDetails()
 
     val compactionThread = new Thread {
       override def run(): Unit = {
@@ -274,9 +270,7 @@ object CarbonDataRDDFactory {
             // no need to throw this as compaction is over
             case ex: Exception =>
           } finally {
-            if (compactionModel.compactionType != CompactionType.IUD_UPDDEL_DELTA) {
-              compactionLock.unlock()
-            }
+            compactionLock.unlock()
           }
         }
       }
@@ -328,7 +322,7 @@ object CarbonDataRDDFactory {
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
     // create new segment folder  in carbon store
     if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable ||
-        updateModel.isDefined && updateModel.get.loadAsNewSegment) {
+        updateModel.isDefined) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
     }
     var loadStatus = SegmentStatus.SUCCESS
@@ -342,33 +336,8 @@ object CarbonDataRDDFactory {
 
     try {
       if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
-        if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-          res = loadDataFrameForUpdate(
-            sqlContext,
-            dataFrame,
-            carbonLoadModel,
-            updateModel,
-            carbonTable,
-            hadoopConf,
-            segmentMetaDataAccumulator)
-          res.foreach { resultOfSeg =>
-            resultOfSeg.foreach { resultOfBlock =>
-              if (resultOfBlock._2._1.getSegmentStatus == SegmentStatus.LOAD_FAILURE) {
-                loadStatus = SegmentStatus.LOAD_FAILURE
-                if (resultOfBlock._2._2.failureCauses == FailureCauses.NONE) {
-                  updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-                  updateModel.get.executorErrors.errorMsg = "Failure in the Executor."
-                } else {
-                  updateModel.get.executorErrors = resultOfBlock._2._2
-                }
-              } else if (resultOfBlock._2._1.getSegmentStatus ==
-                         SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-                loadStatus = SegmentStatus.LOAD_PARTIAL_SUCCESS
-                updateModel.get.executorErrors.failureCauses = resultOfBlock._2._2.failureCauses
-                updateModel.get.executorErrors.errorMsg = resultOfBlock._2._2.errorMsg
-              }
-            }
-          }
+        if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+          // if the rowToBeUpdated is empty, do nothing
         } else {
           status = if (scanResultRdd.isDefined) {
             val colSchema = carbonLoadModel
@@ -493,72 +462,6 @@ object CarbonDataRDDFactory {
         LOGGER.error(ex)
     }
     try {
-      // handle the status file update for the update cmd.
-      if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-        if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-          CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
-          return null
-        } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
-                   updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
-                   carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          return null
-        } else {
-          // in success case handle update of the table status file.
-          // success case.
-          val segmentDetails = new util.HashSet[Segment]()
-          var resultSize = 0
-          res.foreach { resultOfSeg =>
-            resultSize = resultSize + resultOfSeg.size
-            resultOfSeg.foreach { resultOfBlock =>
-              segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName))
-            }
-          }
-          var segmentMetaDataInfoMap = scala
-            .collection
-            .mutable
-            .Map
-            .empty[String, SegmentMetaDataInfo]
-          if (!segmentMetaDataAccumulator.isZero) {
-            segmentMetaDataAccumulator.value.asScala.foreach(map => if (map.nonEmpty) {
-              segmentMetaDataInfoMap = segmentMetaDataInfoMap ++ map
-            })
-          }
-          val segmentFiles = updateSegmentFiles(carbonTable,
-            segmentDetails,
-            updateModel.get,
-            segmentMetaDataInfoMap.asJava)
-
-          // this means that the update doesnt have any records to update so no need to do table
-          // status file update.
-          if (resultSize == 0) {
-            return null
-          }
-          if (!CarbonUpdateUtil.updateTableMetadataStatus(
-            segmentDetails,
-            carbonTable,
-            updateModel.get.updatedTimeStamp + "",
-            true,
-            new util.ArrayList[Segment](0),
-            new util.ArrayList[Segment](segmentFiles), "")) {
-            LOGGER.error("Data update failed due to failure in table status update.")
-            updateModel.get.executorErrors.errorMsg = errorMessage
-            updateModel.get.executorErrors.failureCauses = FailureCauses
-              .STATUS_FILE_UPDATION_FAILURE
-            return null
-          }
-          // code to handle Pre-Priming cache for update command
-          if (!segmentFiles.isEmpty) {
-            val segmentsToPrePrime = segmentFiles
-              .asScala
-              .map(iterator => iterator.getSegmentNo)
-              .toSeq
-            DistributedRDDUtils
-              .triggerPrepriming(sqlContext.sparkSession, carbonTable, segmentsToPrePrime,
-                operationContext, hadoopConf, segmentsToPrePrime.toList)
-          }
-        }
-        return null
-      }
       val uniqueTableStatusId = Option(operationContext.getProperty("uuid")).getOrElse("")
         .asInstanceOf[String]
       if (loadStatus == SegmentStatus.LOAD_FAILURE) {
@@ -590,6 +493,9 @@ object CarbonDataRDDFactory {
           LOGGER.info("********clean up done**********")
           throw new Exception(status(0)._2._2.errorMsg)
         }
+        if (updateModel.isDefined && dataFrame.get.rdd.isEmpty()) {
+          return null
+        }
         // as no record loaded in new segment, new segment should be deleted
         val newEntryLoadStatus =
           if (carbonLoadModel.isCarbonTransactionalTable &&
@@ -721,210 +627,6 @@ object CarbonDataRDDFactory {
                      s" ${carbonTable.getDatabaseName}.${carbonTable.getTableName}")
     }
   }
-  /**
-   * Add and update the segment files. In case of update scenario the carbonindex files are written
-   * to the same segment so we need to update old segment file. So this method writes the latest
-   * data to new segment file and merges this file old file to get latest updated files.
-   * @param carbonTable
-   * @param segmentDetails
-   * @return
-   */
-  private def updateSegmentFiles(
-      carbonTable: CarbonTable,
-      segmentDetails: util.HashSet[Segment],
-      updateModel: UpdateTableModel,
-      segmentMetaDataInfoMap: util.Map[String, SegmentMetaDataInfo]) = {
-    val metadataDetails =
-      SegmentStatusManager.readTableStatusFile(
-        CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath))
-    val updateTableStatusFile = CarbonUpdateUtil.getUpdateStatusFileName(updateModel
-      .updatedTimeStamp.toString)
-    val updatedSegments = SegmentUpdateStatusManager.readLoadMetadata(updateTableStatusFile,
-      carbonTable.getTablePath).map(_.getSegmentName).toSet
-    val segmentFiles = segmentDetails.asScala.map { segment =>
-      // create new segment files and merge for only updated segments
-      if (updatedSegments.contains(segment.getSegmentNo)) {
-        val load =
-          metadataDetails.find(_.getLoadName.equals(segment.getSegmentNo)).get
-        val segmentFile = load.getSegmentFile
-        var segmentFiles: Seq[CarbonFile] = Seq.empty[CarbonFile]
-
-        val segmentMetaDataInfo = segmentMetaDataInfoMap.get(segment.getSegmentNo)
-        val segmentFileName = SegmentFileStore.writeSegmentFile(
-          carbonTable,
-          segment.getSegmentNo,
-          String.valueOf(System.currentTimeMillis()),
-          load.getPath,
-          segmentMetaDataInfo)
-
-        if (segmentFile != null) segmentFiles ++= FileFactory.getCarbonFile(
-          SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFile)) :: Nil
-        val updatedSegFile = if (segmentFileName != null) {
-          val segmentCarbonFile = FileFactory.getCarbonFile(
-            SegmentFileStore.getSegmentFilePath(carbonTable.getTablePath, segmentFileName))
-          segmentFiles ++= segmentCarbonFile :: Nil
-
-          val mergedSegFileName = SegmentFileStore.genSegmentFileName(
-            segment.getSegmentNo,
-            updateModel.updatedTimeStamp.toString)
-          SegmentFileStore.mergeSegmentFiles(
-            mergedSegFileName,
-            CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath),
-            segmentFiles.toArray)
-          segmentFiles.foreach { oldSegmentFile =>
-            oldSegmentFile.delete()
-            LOGGER.debug(s"Old segment file is deleted after segment file merge: ${
-              oldSegmentFile.getName
-            }")
-          }
-          mergedSegFileName + CarbonTablePath.SEGMENT_EXT
-        } else null
-
-        new Segment(segment.getSegmentNo, updatedSegFile)
-      } else {
-        segment
-      }
-    }.filter(_.getSegmentFileName != null).asJava
-    segmentFiles
-  }
-
-  /**
-   * If data load is triggered by UPDATE query, this func will execute the update
-   * TODO: move it to a separate update command
-   */
-  private def loadDataFrameForUpdate(
-      sqlContext: SQLContext,
-      dataFrame: Option[DataFrame],
-      carbonLoadModel: CarbonLoadModel,
-      updateModel: Option[UpdateTableModel],
-      carbonTable: CarbonTable,
-      hadoopConf: Configuration,
-      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
-  ): Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = {
-    val segmentUpdateParallelism = CarbonProperties.getInstance().getParallelismForSegmentUpdate
-
-    val updateRdd = dataFrame.get.rdd
-
-    // return directly if no rows to update
-    val noRowsToUpdate = updateRdd.isEmpty()
-    if (noRowsToUpdate) {
-      Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]]()
-    } else {
-      // splitting as (key, value) i.e., (segment, updatedRows)
-      val keyRDD = updateRdd.map(row =>
-        (row.get(row.size - 1).toString, Row(row.toSeq.slice(0, row.size - 1): _*)))
-
-      val loadMetadataDetails = SegmentStatusManager.readLoadMetadata(
-        carbonTable.getMetadataPath)
-        .filter(lmd => lmd.getSegmentStatus.equals(SegmentStatus.LOAD_PARTIAL_SUCCESS) ||
-                       lmd.getSegmentStatus.equals(SegmentStatus.SUCCESS))
-      val segments = loadMetadataDetails.map(f => new Segment(f.getLoadName, f.getSegmentFile))
-      val segmentIdIndex = segments.map(_.getSegmentNo).zipWithIndex.toMap
-      val segmentId2maxTaskNo = segments.map { seg =>
-        (seg.getSegmentNo,
-          CarbonUpdateUtil.getLatestTaskIdForSegment(seg, carbonLoadModel.getTablePath))
-      }.toMap
-
-      class SegmentPartitioner(segIdIndex: Map[String, Int], parallelism: Int)
-        extends org.apache.spark.Partitioner {
-        override def numPartitions: Int = segmentIdIndex.size * parallelism
-
-        override def getPartition(key: Any): Int = {
-          val segId = key.asInstanceOf[String]
-          segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
-        }
-      }
-
-      val partitionByRdd = keyRDD.partitionBy(
-        new SegmentPartitioner(segmentIdIndex, segmentUpdateParallelism))
-
-      val carbonSessionInfoBroadcast = sqlContext.sparkSession.sparkContext
-        .broadcast(ThreadLocalSessionInfo.getCarbonSessionInfo)
-      // because partitionId=segmentIdIndex*parallelism+RandomPart and RandomPart<parallelism,
-      // so segmentIdIndex=partitionId/parallelism, this has been verified.
-      val conf = SparkSQLUtil.broadCastHadoopConf(sqlContext.sparkSession.sparkContext, hadoopConf)
-      partitionByRdd.map(_._2).mapPartitions { partition =>
-        ThreadLocalSessionInfo.setCarbonSessionInfo(carbonSessionInfoBroadcast.value)
-        ThreadLocalSessionInfo.setConfigurationToCurrentThread(conf.value.value)
-        val partitionId = TaskContext.getPartitionId()
-        val segIdIndex = partitionId / segmentUpdateParallelism
-        val randomPart = partitionId - segIdIndex * segmentUpdateParallelism
-        val segId = segments(segIdIndex)
-        val newTaskNo = segmentId2maxTaskNo(segId.getSegmentNo) + randomPart + 1
-        List(triggerDataLoadForSegment(
-          carbonLoadModel,
-          updateModel,
-          segId.getSegmentNo,
-          newTaskNo,
-          partition,
-          segmentMetaDataAccumulator).toList).toIterator
-      }.collect()
-    }
-  }
-
-  /**
-   * TODO: move it to a separate update command
-   */
-  private def triggerDataLoadForSegment(
-      carbonLoadModel: CarbonLoadModel,
-      updateModel: Option[UpdateTableModel],
-      key: String,
-      taskNo: Long,
-      iter: Iterator[Row],
-      segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]]
-  ): Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] = {
-    val rddResult = new updateResultImpl()
-    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-    val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-      val loadMetadataDetails = new LoadMetadataDetails
-      val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
-      var uniqueLoadStatusId = ""
-      try {
-        val segId = key
-        val index = taskNo
-        uniqueLoadStatusId = carbonLoadModel.getTableName +
-                             CarbonCommonConstants.UNDERSCORE +
-                             (index + "_0")
-
-        loadMetadataDetails.setLoadName(segId)
-        loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
-        carbonLoadModel.setSegmentId(segId)
-        carbonLoadModel.setTaskNo(String.valueOf(index))
-        carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
-
-        loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-        UpdateDataLoad.DataLoadForUpdate(segId,
-          index,
-          iter,
-          carbonLoadModel,
-          loadMetadataDetails,
-          segmentMetaDataAccumulator)
-      } catch {
-        case e: NoRetryException =>
-          loadMetadataDetails
-            .setSegmentStatus(SegmentStatus.LOAD_PARTIAL_SUCCESS)
-          executionErrors.failureCauses = FailureCauses.BAD_RECORDS
-          executionErrors.errorMsg = e.getMessage
-          LOGGER.info("Bad Record Found")
-        case e: Exception =>
-          LOGGER.info("DataLoad failure")
-          LOGGER.error(e)
-          throw e
-      }
-
-      var finished = false
-
-      override def hasNext: Boolean = !finished
-
-      override def next(): (String, (LoadMetadataDetails, ExecutionErrors)) = {
-        finished = true
-        rddResult
-          .getKey(uniqueLoadStatusId,
-            (loadMetadataDetails, executionErrors))
-      }
-    }
-    resultIter
-  }
 
   /**
    * Trigger compaction after data load
@@ -1050,7 +752,7 @@ object CarbonDataRDDFactory {
     }
     var done = true
     // If the updated data should be added as new segment then update the segment information
-    if (updateModel.isDefined && updateModel.get.loadAsNewSegment) {
+    if (updateModel.isDefined) {
       done = done && CarbonUpdateUtil.updateTableMetadataStatus(
         carbonLoadModel.getLoadMetadataDetails.asScala.map(l =>
           new Segment(l.getMergedLoadName,
@@ -1058,6 +760,7 @@ object CarbonDataRDDFactory {
         carbonTable,
         carbonLoadModel.getFactTimeStamp.toString,
         true,
+        true,
         updateModel.get.deletedSegments.asJava)
     }
     done = done && CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
deleted file mode 100644
index c4b8e0a..0000000
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.rdd
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.mapred.JobConf
-import org.apache.hadoop.mapreduce.Job
-import org.apache.spark.Partition
-import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.command.CarbonMergerMapping
-import org.apache.spark.util.CollectionAccumulator
-
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
-import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
-import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
-import org.apache.carbondata.hadoop.api.CarbonInputFormat
-import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
-import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
-import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.util.CarbonSparkUtil
-
-/**
- * IUD carbon merger RDD
- * */
-class CarbonIUDMergerRDD[K, V](
-    @transient private val ss: SparkSession,
-    result: MergeResult[K, V],
-    carbonLoadModel: CarbonLoadModel,
-    carbonMergerMapping: CarbonMergerMapping,
-    segmentMetaDataAccumulator: CollectionAccumulator[Map[String, SegmentMetaDataInfo]])
-  extends CarbonMergerRDD[K, V](ss,
-    result,
-    carbonLoadModel,
-    carbonMergerMapping,
-    segmentMetaDataAccumulator) {
-
-  override def internalGetPartitions: Array[Partition] = {
-    val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
-      tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
-    )
-    val job: Job = CarbonSparkUtil.createHadoopJob()
-    val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
-    val defaultParallelism = sparkContext.defaultParallelism
-    val noOfBlocks = 0
-
-    CarbonInputFormat.setSegmentsToAccess(
-      job.getConfiguration, carbonMergerMapping.validSegments.toList.asJava)
-    CarbonInputFormat.setTableInfo(
-      job.getConfiguration,
-      carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
-
-    // get splits
-    val splits = format.getSplits(job)
-    val carbonInputSplits = splits.asScala.map(_.asInstanceOf[CarbonInputSplit])
-
-    // group blocks by segment.
-    val splitsGroupedMySegment =
-      carbonInputSplits.groupBy(_.getSegmentId)
-
-    var i = -1
-
-    // No need to get a new SegmentUpdateStatus Manager as the Object is passed
-    // in CarbonLoadModel.
-    // val manager = new SegmentUpdateStatusManager(absoluteTableIdentifier)
-    val updateStatusManager = carbonLoadModel.getSegmentUpdateStatusManager
-
-    // make one spark partition for one segment
-    val resultSplits = splitsGroupedMySegment.map { entry =>
-      val (segName, splits) = (entry._1, entry._2)
-      val validSplits = splits.filter { inputSplit =>
-        CarbonDataMergerUtil
-          .checkUpdateDeltaMatchBlock(segName, inputSplit.getBlockPath, updateStatusManager)
-      }
-
-      if (validSplits.nonEmpty) {
-        val locations = validSplits.head.getLocations
-        i += 1
-        new CarbonSparkPartition(id, i,
-          new CarbonMultiBlockSplit(validSplits.asJava, locations))
-      } else {
-        null
-      }
-    }.filter( _ != null)
-
-    // max segment cardinality is calculated in executor for each segment
-    carbonMergerMapping.maxSegmentColumnSchemaList = null
-
-    // Log the distribution
-    val noOfTasks = resultSplits.size
-    logInfo(s"Identified  no.of.Blocks: $noOfBlocks,"
-            + s"parallelism: $defaultParallelism , no.of.nodes: unknown, no.of.tasks: $noOfTasks"
-    )
-    logInfo("Time taken to identify Blocks to scan : " + (System
-                                                            .currentTimeMillis() - startTime)
-    )
-    resultSplits.foreach { partition =>
-      logInfo(s"Node : " + partition.multiBlockSplit.getLocations.toSeq.mkString(",")
-              + ", No.Of Blocks : " + partition.multiBlockSplit.getLength
-      )
-    }
-    resultSplits.toArray
-  }
-}
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0f0ac0b..061ce90 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -137,36 +137,11 @@ class CarbonMergerRDD[K, V](
 
         Collections.sort(tableBlockInfoList)
 
-        // During UPDATE DELTA COMPACTION case all the blocks received in compute belongs to
-        // one segment, so max cardinality will be calculated from first block of segment
-        if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.compactionType) {
-          var dataFileFooter: DataFileFooter = null
-          try {
-            // As the tableBlockInfoList is sorted take the ColCardinality from the last
-            // Block of the sorted list as it will have the last updated cardinality.
-            // Blocks are sorted by order of the update using TableBlockInfo.compare method so
-            // the last block after the sort will be the latest one.
-            dataFileFooter = CarbonUtil
-              .readMetadataFile(tableBlockInfoList.get(tableBlockInfoList.size() - 1))
-          } catch {
-            case e: IOException =>
-              logError("Exception in preparing the data file footer for compaction " + e.getMessage)
-              throw e
-          }
-          // target load name will be same as source load name in case of update data compaction
-          carbonMergerMapping.mergedLoadName = tableBlockInfoList.get(0).getSegmentId
-          carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
-            .toList
-        }
-        mergeNumber = if (CompactionType.IUD_UPDDEL_DELTA == carbonMergerMapping.compactionType) {
-          tableBlockInfoList.get(0).getSegment.toString
-        } else {
-          mergedLoadName.substring(
-            mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-              CarbonCommonConstants.LOAD_FOLDER.length(),
-            mergedLoadName.length()
-          )
-        }
+        mergeNumber = mergedLoadName.substring(
+          mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+            CarbonCommonConstants.LOAD_FOLDER.length(),
+          mergedLoadName.length()
+        )
         carbonLoadModel.setSegmentId(mergeNumber)
 
         if(carbonTable.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 69ea294..30b59f5 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -88,9 +88,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     var loadsToMerge = identifySegmentsToBeMerged()
 
-    while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge) ||
-           (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType &&
-            loadsToMerge.size() > 0)) {
+    while (loadsToMerge.size() > 1 || needSortSingleSegment(loadsToMerge)) {
       val lastSegment = sortedSegments.get(sortedSegments.size() - 1)
       deletePartialLoadsInCompaction()
       val compactedLoad = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
@@ -145,8 +143,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           .filterOutNewlyAddedSegments(carbonLoadModel.getLoadMetadataDetails, lastSegment)
       }
 
-      if (CompactionType.IUD_UPDDEL_DELTA == compactionModel.compactionType ||
-        CompactionType.CUSTOM == compactionModel.compactionType) {
+      if (CompactionType.CUSTOM == compactionModel.compactionType) {
         loadsToMerge.clear()
       } else if (segList.size > 0) {
         loadsToMerge = identifySegmentsToBeMerged()
@@ -235,18 +232,10 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       .collectionAccumulator[Map[String, SegmentMetaDataInfo]]
 
     val mergeStatus =
-      if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
-        new CarbonIUDMergerRDD(
-          sc.sparkSession,
-          new MergeResultImpl(),
-          carbonLoadModel,
-          carbonMergerMapping,
-          segmentMetaDataAccumulator
-        ).collect
-      } else if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
-                 !carbonTable.getSortColumns.isEmpty &&
-                 carbonTable.getRangeColumn == null &&
-                 CarbonUtil.isStandardCarbonTable(carbonTable)) {
+      if (SortScope.GLOBAL_SORT == carbonTable.getSortScope &&
+          !carbonTable.getSortColumns.isEmpty &&
+          carbonTable.getRangeColumn == null &&
+          CarbonUtil.isStandardCarbonTable(carbonTable)) {
         compactSegmentsByGlobalSort(sc.sparkSession,
           carbonLoadModel,
           carbonMergerMapping,
@@ -291,29 +280,14 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
         segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
       } else {
         // Get the segment files each updated segment in case of IUD compaction
-        if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-          val segmentFilesList = loadsToMerge.asScala.map { seg =>
-            val segmentMetaDataInfo = new SegmentFileStore(carbonLoadModel.getTablePath,
-              seg.getSegmentFile).getSegmentFile.getSegmentMetaDataInfo
-            val file = SegmentFileStore.writeSegmentFile(
-              carbonTable,
-              seg.getLoadName,
-              carbonLoadModel.getFactTimeStamp.toString,
-              segmentMetaDataInfo)
-            new Segment(seg.getLoadName, file)
-          }.filter(_.getSegmentFileName != null).asJava
-          segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
-        } else {
-          // get segmentMetadata info from accumulator
-          val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
-            mergedLoadNumber,
-            segmentMetaDataAccumulator)
-          segmentFileName = SegmentFileStore.writeSegmentFile(
-            carbonTable,
-            mergedLoadNumber,
-            carbonLoadModel.getFactTimeStamp.toString,
-            segmentMetaDataInfo)
-        }
+        val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
+          mergedLoadNumber,
+          segmentMetaDataAccumulator)
+        segmentFileName = SegmentFileStore.writeSegmentFile(
+          carbonTable,
+          mergedLoadNumber,
+          carbonLoadModel.getFactTimeStamp.toString,
+          segmentMetaDataInfo)
       }
       // clear segmentMetaDataAccumulator
       segmentMetaDataAccumulator.reset()
@@ -332,12 +306,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       val endTime = System.nanoTime()
       LOGGER.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
       val statusFileUpdate =
-        ((compactionType == CompactionType.IUD_UPDDEL_DELTA) &&
-         CarbonDataMergerUtil
-           .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
-             carbonTable.getMetadataPath,
-             carbonLoadModel,
-             segmentFilesForIUDCompact)) ||
         CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
           loadsToMerge,
           carbonTable.getMetadataPath,
@@ -356,8 +324,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                             s"${ carbonLoadModel.getTableName }")
       }
 
-      if (compactionType != CompactionType.IUD_DELETE_DELTA &&
-          compactionType != CompactionType.IUD_UPDDEL_DELTA) {
+      if (compactionType != CompactionType.IUD_DELETE_DELTA) {
         MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
       }
 
@@ -392,8 +359,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
                     s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
 
         // Pre-priming index for compaction
-        val segmentsForPriming = if (compactionType.equals(CompactionType.IUD_DELETE_DELTA) ||
-            compactionType.equals(CompactionType.IUD_UPDDEL_DELTA)) {
+        val segmentsForPriming = if (compactionType.equals(CompactionType.IUD_DELETE_DELTA)) {
             validSegments.asScala.map(_.getSegmentNo).toList
         } else if (compactionType.equals(CompactionType.MAJOR) ||
                    compactionType.equals(CompactionType.MINOR) ||
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
index 00a2f00..f202a17 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/view/MVRefresher.scala
@@ -200,12 +200,7 @@ object MVRefresher {
           viewManager.setStatus(viewSchema.getIdentifier, MVStatus.DISABLED)
           LOGGER.error("Data Load failed for mv: ", exception)
           CarbonLoaderUtil.updateTableStatusInCaseOfFailure(
-            newLoadName,
-            viewTable.getAbsoluteTableIdentifier,
-            viewTable.getTableName,
-            viewTable.getDatabaseName,
-            viewTable.getTablePath,
-            viewTable.getMetadataPath)
+            newLoadName, viewTable, SegmentStatus.INSERT_IN_PROGRESS)
           throw exception
       } finally {
         unsetInputSegments(viewSchema)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 04ea523..c02911d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -121,7 +121,7 @@ case class UpdateTableModel(
     updatedTimeStamp: Long,
     var executorErrors: ExecutionErrors,
     deletedSegments: Seq[Segment],
-    loadAsNewSegment: Boolean = false)
+    var insertedSegment: Option[String])
 
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 755d35b..6ab65d3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -215,22 +215,6 @@ case class CarbonAlterTableCompactionCommand(
     val compactionType = CompactionType.valueOf(alterTableModel.compactionType.toUpperCase)
     val compactionSize = CarbonDataMergerUtil.getCompactionSize(compactionType, carbonLoadModel)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
-      if (alterTableModel.segmentUpdateStatusManager.isDefined) {
-        carbonLoadModel.setSegmentUpdateStatusManager(
-          alterTableModel.segmentUpdateStatusManager.get)
-        carbonLoadModel.setLoadMetadataDetails(
-          alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
-      } else {
-        // segmentUpdateStatusManager will not be defined in case of IUD/horizontal compaction on
-        // materialized views. In that case, create new segmentUpdateStatusManager object
-        // using carbonTable
-        val segmentUpdateStatusManager = new SegmentUpdateStatusManager(carbonTable)
-        carbonLoadModel.setSegmentUpdateStatusManager(segmentUpdateStatusManager)
-        carbonLoadModel.setLoadMetadataDetails(
-          segmentUpdateStatusManager.getLoadMetadataDetails.toList.asJava)
-      }
-    }
 
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       carbonLoadModel.readAndSetLoadMetadataDetails()
@@ -320,19 +304,17 @@ case class CarbonAlterTableCompactionCommand(
       try {
         // COMPACTION_LOCK and UPDATE_LOCK are already locked when start to execute update sql,
         // so it don't need to require locks again when compactionType is IUD_UPDDEL_DELTA.
-        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
-          if (!updateLock.lockWithRetries()) {
-            throw new ConcurrentOperationException(carbonTable, "update", "compaction")
-          }
-          if (!lock.lockWithRetries()) {
-            LOGGER.error(s"Not able to acquire the compaction lock for table " +
-                         s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-            CarbonException.analysisException(
-              "Table is already locked for compaction. Please try after some time.")
-          } else {
-            LOGGER.info("Acquired the compaction lock for table " +
-                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
-          }
+        if (!updateLock.lockWithRetries()) {
+          throw new ConcurrentOperationException(carbonTable, "update", "compaction")
+        }
+        if (!lock.lockWithRetries()) {
+          LOGGER.error(s"Not able to acquire the compaction lock for table " +
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
+          CarbonException.analysisException(
+            "Table is already locked for compaction. Please try after some time.")
+        } else {
+          LOGGER.info("Acquired the compaction lock for table " +
+            s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         }
         CarbonDataRDDFactory.startCompactionThreads(
           sqlContext,
@@ -346,14 +328,10 @@ case class CarbonAlterTableCompactionCommand(
       } catch {
         case e: Exception =>
           LOGGER.error(s"Exception in start compaction thread.", e)
-          if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
-            lock.unlock()
-          }
+          lock.unlock()
           throw e
       } finally {
-        if (CompactionType.IUD_UPDDEL_DELTA != compactionType) {
-          updateLock.unlock()
-        }
+        updateLock.unlock()
         MVManagerInSpark.disableMVOnTable(sqlContext.sparkSession, carbonTable)
       }
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index e622671..353f49c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -205,9 +205,9 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
           operationContext = operationContext)
 
       // add the start entry for the new load in the table status file
-      if ((updateModel.isEmpty || updateModel.isDefined && updateModel.get.loadAsNewSegment)
+      if ((updateModel.isEmpty || updateModel.isDefined)
           && !table.isHivePartitionTable) {
-        if (updateModel.isDefined ) {
+        if (updateModel.isDefined) {
           carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
         }
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -449,13 +449,7 @@ case class CarbonInsertIntoCommand(databaseNameOp: Option[String],
 
   def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
     var rows = Seq.empty[Row]
-    val loadDataFrame = if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-      // TODO: handle the update flow for new insert into flow without converter step
-      throw new UnsupportedOperationException(
-        "Update flow is not supported without no converter step yet.")
-    } else {
-      Some(dataFrame)
-    }
+    val loadDataFrame = Some(dataFrame)
     val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var loadResult : LoadMetadataDetails = null
     if (table.isHivePartitionTable) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
index 2496b96..75d319e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
@@ -110,9 +110,9 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
       // Clean up the old invalid segment data before creating a new entry for new load.
       SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions)
       // add the start entry for the new load in the table status file
-      if ((updateModel.isEmpty || updateModel.isDefined && updateModel.get.loadAsNewSegment)
+      if ((updateModel.isEmpty || updateModel.isDefined)
           && !table.isHivePartitionTable) {
-        if (updateModel.isDefined ) {
+        if (updateModel.isDefined) {
           carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
         }
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -154,6 +154,9 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
 
       LOGGER.info("Sort Scope : " + carbonLoadModel.getSortScope)
       val (rows, loadResult) = insertData(loadParams)
+      if (updateModel.isDefined) {
+        updateModel.get.insertedSegment = Some(carbonLoadModel.getSegmentId)
+      }
       val info = CommonLoadUtils.makeAuditInfo(loadResult)
       CommonLoadUtils.firePostLoadEvents(sparkSession,
         carbonLoadModel,
@@ -186,11 +189,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp: Option[String],
 
   def insertData(loadParams: CarbonLoadParams): (Seq[Row], LoadMetadataDetails) = {
     var rows = Seq.empty[Row]
-    val loadDataFrame = if (updateModel.isDefined && !updateModel.get.loadAsNewSegment) {
-      Some(CommonLoadUtils.getDataFrameWithTupleID(Some(dataFrame)))
-    } else {
-      Some(dataFrame)
-    }
+    val loadDataFrame = Some(dataFrame)
     val table = loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     var loadResult : LoadMetadataDetails = null
     loadParams.dataFrame = loadDataFrame
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 5c46127..3bc2590 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -900,32 +900,22 @@ object CommonLoadUtils {
     try {
       val query: LogicalPlan = if ((loadParams.dataFrame.isDefined) ||
                                    loadParams.scanResultRDD.isDefined) {
-        val (rdd, dfAttributes) =
-          if (loadParams.updateModel.isDefined && !loadParams.updateModel.get.loadAsNewSegment) {
+        val (rdd, dfAttributes) = {
             // Get the updated query plan in case of update scenario
-            val updatedFrame = Dataset.ofRows(
-              loadParams.sparkSession,
-              getLogicalQueryForUpdate(
-                loadParams.sparkSession,
-                catalogTable,
-                loadParams.dataFrame.get,
-                loadParams.carbonLoadModel))
-            (updatedFrame.rdd, updatedFrame.schema)
-        } else {
-          if (loadParams.finalPartition.nonEmpty) {
-            val headers = loadParams.carbonLoadModel
-              .getCsvHeaderColumns
-              .dropRight(loadParams.finalPartition.size)
-            val updatedHeader = headers ++ loadParams.finalPartition.keys.map(_.toLowerCase)
-            loadParams.carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
-            loadParams.carbonLoadModel
-              .setCsvHeaderColumns(loadParams.carbonLoadModel.getCsvHeader.split(","))
-          }
-          if (loadParams.dataFrame.isDefined) {
-            (loadParams.dataFrame.get.rdd, loadParams.dataFrame.get.schema)
-          } else {
-            (null, null)
-          }
+            if (loadParams.finalPartition.nonEmpty) {
+              val headers = loadParams.carbonLoadModel
+                .getCsvHeaderColumns
+                .dropRight(loadParams.finalPartition.size)
+              val updatedHeader = headers ++ loadParams.finalPartition.keys.map(_.toLowerCase)
+              loadParams.carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
+              loadParams.carbonLoadModel
+                .setCsvHeaderColumns(loadParams.carbonLoadModel.getCsvHeader.split(","))
+            }
+            if (loadParams.dataFrame.isDefined) {
+              (loadParams.dataFrame.get.rdd, loadParams.dataFrame.get.schema)
+            } else {
+              (null, null)
+            }
         }
         if (loadParams.dataFrame.isDefined) {
           val expectedColumns = {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index d2ce2d4..eae4aab 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -93,6 +93,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
         LockUsage.UPDATE_LOCK)
     var lockStatus = false
+    var hasException = false
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -108,9 +109,6 @@ private[sql] case class CarbonProjectForDeleteCommand(
       }
       val executorErrors = ExecutionErrors(FailureCauses.NONE, "")
 
-      // handle the clean up of IUD.
-      CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
-
       val (deletedSegments, deletedRowCount) = DeleteExecution.deleteDeltaExecution(
         databaseNameOp,
         tableName,
@@ -126,8 +124,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
       }
 
       // call IUD Compaction.
-      HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
-        isUpdateOperation = false)
+      HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable)
 
       // Truncate materialized views on the current table.
       val viewManager = MVManagerInSpark.get(sparkSession)
@@ -150,13 +147,14 @@ private[sql] case class CarbonProjectForDeleteCommand(
         LOGGER.error("Delete operation passed. Exception in Horizontal Compaction." +
                      " Please check logs. " + e.getMessage)
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, e.compactionTimeStamp.toString)
+        hasException = true
         Seq(Row(0L))
 
       case e: Exception =>
         LOGGER.error("Exception in Delete data operation " + e.getMessage, e)
         // ****** start clean up.
         // In case of failure , clean all related delete delta files
-        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+        hasException = true
 
         // clean up. Null check is required as for executor error some times message is null
         if (null != e.getMessage) {
@@ -182,6 +180,10 @@ private[sql] case class CarbonProjectForDeleteCommand(
         LOGGER.error(s"Unable to unlock compactionLock for " +
           s"table $tableName after delete operation");
       }
+
+      if (hasException) {
+        CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, timestamp)
+      }
     }
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 032f410..471ead3 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types.{ArrayType, LongType}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.AlterTableUtil
-import scala.collection.JavaConverters._
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -38,10 +37,12 @@ import org.apache.carbondata.core.features.TableOperation
 import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus, UpdateTablePostEvent, UpdateTablePreEvent}
 import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+import org.apache.carbondata.spark.util.CarbonScalaUtil
 import org.apache.carbondata.view.MVManagerInSpark
 
 private[sql] case class CarbonProjectForUpdateCommand(
@@ -118,8 +119,10 @@ private[sql] case class CarbonProjectForUpdateCommand(
     //    var dataFrame: DataFrame = null
     var dataSet: DataFrame = null
     val isPersistEnabled = CarbonProperties.getInstance.isPersistUpdateDataset
-    var hasException = false
+    var hasHorizontalCompactionException = false
+    var hasUpdateException = false
     var fileTimestamp = ""
+    var updateTableModel: UpdateTableModel = null
     try {
       lockStatus = metadataLock.lockWithRetries()
       if (lockStatus) {
@@ -158,8 +161,6 @@ private[sql] case class CarbonProjectForUpdateCommand(
                 "for the update key")
             }
           }
-          // handle the clean up of IUD.
-          CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
           // do delete operation.
           val (segmentsToBeDeleted, updatedRowCountTemp) = DeleteExecution.deleteDeltaExecution(
@@ -176,15 +177,16 @@ private[sql] case class CarbonProjectForUpdateCommand(
           }
 
           updatedRowCount = updatedRowCountTemp
+          updateTableModel =
+            UpdateTableModel(true, currentTime, executionErrors, segmentsToBeDeleted, Option.empty)
           // do update operation.
           performUpdate(dataSet,
             databaseNameOp,
             tableName,
             plan,
             sparkSession,
-            currentTime,
-            executionErrors,
-            segmentsToBeDeleted)
+            updateTableModel,
+            executionErrors)
 
           // pre-priming for update command
           DeleteExecution.reloadDistributedSegmentCache(carbonTable,
@@ -202,7 +204,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
       // Do IUD Compaction.
       HorizontalCompaction.tryHorizontalCompaction(
-        sparkSession, carbonTable, isUpdateOperation = true)
+        sparkSession, carbonTable)
 
       // Truncate materialized views on the current table.
       val viewManager = MVManagerInSpark.get(sparkSession)
@@ -221,14 +223,11 @@ private[sql] case class CarbonProjectForUpdateCommand(
           "Update operation passed. Exception in Horizontal Compaction. Please check logs." + e)
         // In case of failure , clean all related delta files
         fileTimestamp = e.compactionTimeStamp.toString
-        hasException = true
+        hasHorizontalCompactionException = true
       case e: Exception =>
         LOGGER.error("Exception in update operation", e)
-        // ****** start clean up.
-        // In case of failure , clean all related delete delta files
         fileTimestamp = currentTime + ""
-        hasException = true
-        // *****end clean up.
+        hasUpdateException = true
         if (null != e.getMessage) {
           sys.error("Update operation failed. " + e.getMessage)
         }
@@ -237,6 +236,14 @@ private[sql] case class CarbonProjectForUpdateCommand(
         }
         sys.error("Update operation failed. please check logs.")
     } finally {
+      // In case of failure, clean new inserted segment,
+      // change the status of new segment to 'mark for delete' from 'success'
+      if (hasUpdateException && null != updateTableModel
+        && updateTableModel.insertedSegment.isDefined) {
+        CarbonLoaderUtil.updateTableStatusInCaseOfFailure(updateTableModel.insertedSegment.get,
+          carbonTable, SegmentStatus.SUCCESS)
+      }
+
       if (updateLock.unlock()) {
         LOGGER.info(s"updateLock unlocked successfully after update $tableName")
       } else {
@@ -264,7 +271,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       }
 
       // In case of failure, clean all related delete delta files.
-      if (hasException) {
+      if (hasHorizontalCompactionException || hasUpdateException) {
+        // In case of failure , clean all related delete delta files
         // When the table has too many segemnts, it will take a long time.
         // So moving it to the end and it is outside of locking.
         CarbonUpdateUtil.cleanStaleDeltaFiles(carbonTable, fileTimestamp)
@@ -279,9 +287,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       tableName: String,
       plan: LogicalPlan,
       sparkSession: SparkSession,
-      currentTime: Long,
-      executorErrors: ExecutionErrors,
-      deletedSegments: Seq[Segment]): Unit = {
+      updateTableModel: UpdateTableModel,
+      executorErrors: ExecutionErrors): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
       val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -340,16 +347,17 @@ private[sql] case class CarbonProjectForUpdateCommand(
       case _ => sys.error("")
     }
 
-    val updateTableModel = UpdateTableModel(true, currentTime, executorErrors, deletedSegments)
-
     val header = getHeader(carbonRelation, plan)
+    val fields = dataFrame.schema.fields
+    val otherFields = CarbonScalaUtil.getAllFieldsWithoutTupleIdField(fields)
+    val dataFrameWithOutTupleId = dataFrame.select(otherFields: _*)
 
     CarbonInsertIntoWithDf(
       databaseNameOp = Some(carbonRelation.identifier.getCarbonTableIdentifier.getDatabaseName),
       tableName = carbonRelation.identifier.getCarbonTableIdentifier.getTableName,
       options = Map(("fileheader" -> header)),
       isOverwriteTable = false,
-      dataFrame = dataFrame,
+      dataFrame = dataFrameWithOutTupleId,
       updateModel = Some(updateTableModel)).process(sparkSession)
 
     executorErrors.errorMsg = updateTableModel.executorErrors.errorMsg
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 25a832f..37f588c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -405,6 +405,7 @@ object DeleteExecution {
             carbonTable,
             timestamp,
             !isUpdateOperation,
+            !isUpdateOperation,
             listOfSegmentToBeMarkedDeleted)
     ) {
       LOGGER.info(s"Delete data operation is successful for " +
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 747885e..6ff3107 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -48,14 +48,12 @@ object HorizontalCompaction {
    */
   def tryHorizontalCompaction(
       sparkSession: SparkSession,
-      carbonTable: CarbonTable,
-      isUpdateOperation: Boolean): Unit = {
+      carbonTable: CarbonTable): Unit = {
 
     if (!CarbonDataMergerUtil.isHorizontalCompactionEnabled) {
       return
     }
 
-    var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
     // To make sure that update and delete timestamps are not same,
@@ -76,21 +74,8 @@ object HorizontalCompaction {
     val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       carbonTable)
 
-    if (isUpdateOperation) {
-
-      // This is only update operation, perform only update compaction.
-      compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
-      performUpdateDeltaCompaction(sparkSession,
-        compactionTypeIUD,
-        carbonTable,
-        absTableIdentifier,
-        segmentUpdateStatusManager,
-        updateTimeStamp,
-        segLists)
-    }
-
     // After Update Compaction perform delete compaction
-    compactionTypeIUD = CompactionType.IUD_DELETE_DELTA
+    val compactionTypeIUD = CompactionType.IUD_DELETE_DELTA
     segLists = CarbonDataMergerUtil.getValidSegmentList(carbonTable)
     if (segLists == null || segLists.size() == 0) {
       return
@@ -114,56 +99,6 @@ object HorizontalCompaction {
   }
 
   /**
-   * Update Delta Horizontal Compaction.
-   */
-  private def performUpdateDeltaCompaction(sparkSession: SparkSession,
-      compactionTypeIUD: CompactionType,
-      carbonTable: CarbonTable,
-      absTableIdentifier: AbsoluteTableIdentifier,
-      segmentUpdateStatusManager: SegmentUpdateStatusManager,
-      factTimeStamp: Long,
-      segLists: util.List[Segment]): Unit = {
-    val db = carbonTable.getDatabaseName
-    val table = carbonTable.getTableName
-    // get the valid segments qualified for update compaction.
-    val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
-      absTableIdentifier,
-      segmentUpdateStatusManager,
-      compactionTypeIUD)
-    if (LOG.isDebugEnabled) {
-      LOG.debug(s"The segment list for Horizontal Update Compaction is $validSegList")
-    }
-
-    if (validSegList.size() == 0) {
-      return
-    }
-
-    LOG.info(s"Horizontal Update Compaction operation started for [$db.$table].")
-
-    try {
-      // Update Compaction.
-      val alterTableModel = AlterTableModel(Option(carbonTable.getDatabaseName),
-        carbonTable.getTableName,
-        Some(segmentUpdateStatusManager),
-        CompactionType.IUD_UPDDEL_DELTA.toString,
-        Some(factTimeStamp))
-
-      CarbonAlterTableCompactionCommand(alterTableModel).run(sparkSession)
-    }
-    catch {
-      case e: Exception =>
-        val msg = if (null != e.getMessage) {
-          e.getMessage
-        } else {
-          "Please check logs for more info"
-        }
-        throw new HorizontalCompactionException(
-          s"Horizontal Update Compaction Failed for [${ db }.${ table }]. " + msg, factTimeStamp)
-    }
-    LOG.info(s"Horizontal Update Compaction operation completed for [${ db }.${ table }].")
-  }
-
-  /**
    * Delete Delta Horizontal Compaction.
    */
   private def performDeleteDeltaCompaction(sparkSession: SparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index ec55ea8..6aa8c88 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -205,23 +205,8 @@ case class CarbonMergeDataSetCommand(
         LOGGER.error("writing of update status file failed")
         throw new CarbonMergeDataSetException("writing of update status file failed")
       }
-      if (carbonTable.isHivePartitionTable) {
-        // If load count is 0 and if merge action contains delete operation, update
-        // tableUpdateStatus file name in loadMeta entry
-        if (count == 0 && hasDelAction && !tuple._1.isEmpty) {
-          val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
-            .getTableStatusFilePath(carbonTable.getTablePath))
-          CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
-            new Segment(loadMetadataDetail.getMergedLoadName,
-              loadMetadataDetail.getSegmentFile)).toSet.asJava,
-            carbonTable,
-            trxMgr.getLatestTrx.toString,
-            true,
-            tuple._2.asJava)
-        }
-      }
       Some(UpdateTableModel(isUpdate = true, trxMgr.getLatestTrx,
-        executorErrors, tuple._2, loadAsNewSegment = true))
+        executorErrors, tuple._2, Option.empty))
     } else {
       None
     }
@@ -238,6 +223,18 @@ case class CarbonMergeDataSetCommand(
       new OperationContext,
       updateTableModel
     ).run(sparkSession)
+
+    if (hasDelAction && count == 0) {
+      val loadMetaDataDetails = SegmentStatusManager.readTableStatusFile(CarbonTablePath
+        .getTableStatusFilePath(carbonTable.getTablePath))
+      CarbonUpdateUtil.updateTableMetadataStatus(loadMetaDataDetails.map(loadMetadataDetail =>
+        new Segment(loadMetadataDetail.getMergedLoadName,
+          loadMetadataDetail.getSegmentFile)).toSet.asJava,
+        carbonTable,
+        trxMgr.getLatestTrx.toString,
+        true,
+        true, new util.ArrayList[Segment]())
+    }
     LOGGER.info(s"Total inserted rows: ${stats.insertedRows.sum}")
     LOGGER.info(s"Total updated rows: ${stats.updatedRows.sum}")
     LOGGER.info(s"Total deleted rows: ${stats.deletedRows.sum}")
@@ -249,7 +246,7 @@ case class CarbonMergeDataSetCommand(
       trxMgr, mutationAction, mergeMatches)
     // Do IUD Compaction.
     HorizontalCompaction.tryHorizontalCompaction(
-      sparkSession, carbonTable, isUpdateOperation = false)
+      sparkSession, carbonTable)
     Seq.empty
   }
 
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 3c0e22b..832c33d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -210,7 +210,6 @@ object SecondaryIndexUtil {
             val file = SegmentFileStore.writeSegmentFile(
               indexCarbonTable,
               seg.getLoadName,
-              segmentIdToLoadStartTimeMapping(seg.getLoadName).toString,
               carbonLoadModel.getFactTimeStamp.toString,
               null,
               null)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 5f796f7..61aef9d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -190,7 +190,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     sql(s"update longerthan32kchar set(longerthan32kchar.dim2)=('$longChar') " +
       "where longerthan32kchar.mes1=1").collect()
     checkAnswer(sql("select * from longerthan32kchar"), Seq(Row("itsok", "hello", 2)))
-    redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "longerthan32kchar", "0", "1")
+    redirectCsvPath = BadRecordUtil.getRedirectCsvPath("default", "longerthan32kchar", "2", "0")
     redirectedFileLineList = FileUtils.readLines(redirectCsvPath)
     iterator = redirectedFileLineList.iterator()
     while (iterator.hasNext) {
@@ -327,7 +327,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     sql("drop table if exists load32000bytes")
   }
 
-  test("test if stale folders are deleting on data load") {
+  test("test data load with stale folders") {
     sql("drop table if exists stale")
     sql("create table stale(a string) STORED AS carbondata")
     sql("insert into stale values('k')")
@@ -336,7 +336,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
     FileFactory.getCarbonFile(tableStatusFile).delete()
     sql("insert into stale values('k')")
     // if table lose tablestatus file, the system should keep all data.
-    checkAnswer(sql("select * from stale"), Seq(Row("k"), Row("k")))
+    checkAnswer(sql("select * from stale"), Seq(Row("k")))
   }
 
   test("test data loading with directly writing fact data to hdfs") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithStaleDataInSegmentFolder.scala b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithStaleDataInSegmentFolder.scala
new file mode 100644
index 0000000..6564141
--- /dev/null
+++ b/integration/spark/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithStaleDataInSegmentFolder.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import java.util
+
+import org.apache.spark.sql.{CarbonEnv, Row}
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+/**
+ * Test class of creating and loading for carbon table with staledata in segment folder
+ *
+ */
+class TestLoadDataWithStaleDataInSegmentFolder extends QueryTest with BeforeAndAfterAll {
+
+  val tableName = "staleDataInSegmentFolder"
+  val siName = "si_StaleDataInSegmentFolder"
+  val testData = s"$resourcesPath/sample.csv"
+  val sortcolumns = "id"
+
+  test("test load with staledata in segmentfolder, " +
+      "carbon.merge.index.in.segment = false") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    testIUDWithStaleData
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+  }
+
+  test("test load with staledata in segmentfolder, " +
+      "carbon.merge.index.in.segment = true") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    testIUDWithStaleData
+    CarbonProperties.getInstance()
+      .removeProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT)
+  }
+
+  private def testIUDWithStaleData(): Unit = {
+    List("NO_SORT", "LOCAL_SORT", "GLOBAL_SORT").foreach(sort => {
+      List(true, false).foreach(isPartition => {
+        testLoadWithStaleData(sort, sortcolumns, isPartition)
+        testSIWithStaleData(sort, sortcolumns, isPartition)
+        testInsertIntoWithStaleData(sort, sortcolumns, isPartition)
+        testUpdateWithStaleData(sort, sortcolumns, isPartition)
+        testDeleteWithStaleData(sort, sortcolumns, isPartition)
+        testCompactWithStaleData(sort, sortcolumns, isPartition)
+      })
+    })
+  }
+
+  private def testLoadWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    mockStaleDataByRemoveTablestatus(tableName)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
+    sql(s"clean files for table $tableName")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
+  }
+
+  private def testCompactWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    mockStaleDataByRemoveTablestatus(tableName)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    sql(s"ALTER TABLE $tableName COMPACT 'MINOR'")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(24)))
+    sql(s"clean files for table $tableName")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(24)))
+  }
+
+  private def testSIWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    createSI()
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    mockStaleDataByRemoveTablestatus(tableName)
+    mockStaleDataByRemoveTablestatus(siName)
+    sql(s"LOAD DATA LOCAL INPATH '$testData' into table $tableName")
+    verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
+    verifyThereIsNoSameContentInDifferentIndexes(siName, "0")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
+    if (isPartition) {
+      checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(6)))
+    } else {
+      checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(4)))
+    }
+    sql(s"clean files for table $tableName")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(6)))
+    if (isPartition) {
+      checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(6)))
+    } else {
+      checkAnswer(sql(s"select count(1) from $siName"), Seq(Row(4)))
+    }
+  }
+
+  private def testInsertIntoWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
+    mockStaleDataByRemoveTablestatus(tableName)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
+    verifyThereIsNoSameContentInDifferentIndexes(tableName, "0")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1)))
+    sql(s"clean files for table $tableName")
+    checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(1)))
+  }
+
+  private def testUpdateWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
+    mockStaleDataByRemoveTablestatus(tableName)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
+    sql("""update staleDataInSegmentFolder d  set
+        | (d.id) = (d.id + 1) where d.name = 'a'""".stripMargin).collect()
+    checkAnswer(sql(s"select * from $tableName"),
+      Seq(Row(2, "a", "c", 2)))
+  }
+
+  private def testDeleteWithStaleData(sortscope: String, sortcolumns: String,
+      isPartition: Boolean): Unit = {
+    createTable(sortscope, sortcolumns, isPartition)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'b', 2)")
+    mockStaleDataByRemoveTablestatus(tableName)
+    sql(s"INSERT INTO $tableName values(1, 'a', 'c', 2)")
+    sql("""delete from staleDataInSegmentFolder d where d.city = 'c'""".stripMargin).collect()
+    checkAnswer(sql(s"select * from $tableName"), Seq())
+  }
+
+  override def afterAll: Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+  }
+
+  private def createTable(sortscope: String, sortcolumns: String, isPartition: Boolean): Unit = {
+    if (!isPartition) {
+      createNonPartitionTable(sortscope, sortcolumns)
+    } else {
+      createPartitionTable(sortscope, sortcolumns)
+    }
+  }
+
+  private def createNonPartitionTable(sortscope: String, sortcolumns: String): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         CREATE TABLE $tableName(id int, name string, city string, age int)
+         STORED AS carbondata
+         TBLPROPERTIES('sort_scope'='$sortscope','sort_columns'='$sortcolumns')
+      """)
+  }
+
+  private def createPartitionTable(sortscope: String, sortcolumns: String): Unit = {
+    sql(s"DROP TABLE IF EXISTS $tableName")
+    sql(
+      s"""
+         CREATE TABLE $tableName(id int, name string, city string)
+         PARTITIONED BY(age int)
+         STORED AS carbondata
+         TBLPROPERTIES('sort_scope'='$sortscope','sort_columns'='$sortcolumns')
+      """)
+  }
+
+  private def createSI(): Unit = {
+    sql(s"create index $siName on table $tableName (city) AS 'carbondata'")
+  }
+
+  private def mockStaleDataByRemoveTablestatus(tableName: String): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
+    val tableStatusFile = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    FileFactory.getCarbonFile(tableStatusFile).delete()
+  }
+
+  private def verifyThereIsNoSameContentInDifferentIndexes(tableName: String,
+      segment: String): Unit = {
+    val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+    var path = CarbonTablePath.
+      getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+    if (table.isHivePartitionTable) {
+      path = table.getAbsoluteTableIdentifier.getTablePath
+    }
+
+    val allIndexFilesSet = new util.HashSet[String]()
+    val allIndexFilesList = new util.ArrayList[String]()
+
+    FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = {
+        file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT) ||
+          file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT)
+      }
+    }).asScala.map(indexFile => {
+      val ssim = new SegmentIndexFileStore()
+      if (indexFile.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        ssim.readMergeFile(indexFile.getAbsolutePath)
+      } else {
+        ssim.readIndexFile(indexFile)
+      }
+      allIndexFilesSet.addAll(ssim.getCarbonIndexMapWithFullPath.keySet())
+      allIndexFilesList.addAll(ssim.getCarbonIndexMapWithFullPath.keySet())
+    })
+    assert(allIndexFilesList.size() == allIndexFilesSet.size())
+  }
+}
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
index 2c2dcf2..0d0a72f 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/TestPruneUsingSegmentMinMax.scala
@@ -103,7 +103,7 @@ class TestPruneUsingSegmentMinMax extends QueryTest with BeforeAndAfterAll {
     sql("update carbon set(a)=(10) where a=1").collect()
     checkAnswer(sql("select count(*) from carbon where a=10"), Seq(Row(3)))
     showCache = sql("show metacache on table carbon").collect()
-    assert(showCache(0).get(2).toString.equalsIgnoreCase("6/8 index files cached"))
+    assert(showCache(0).get(2).toString.equalsIgnoreCase("1/6 index files cached"))
     drop
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
index c370f89..d9cd292 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/alterTable/TestAlterTableSortColumnsProperty.scala
@@ -739,15 +739,17 @@ class TestAlterTableSortColumnsProperty extends QueryTest with BeforeAndAfterAll
 
     val table = CarbonEnv.getCarbonTable(Option("default"), tableName)(sqlContext.sparkSession)
     val tablePath = table.getTablePath
-    (0 to 2).foreach { segmentId =>
+    (0 to 3).foreach { segmentId =>
       val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId.toString)
       val args: Array[String] = Array("-cmd", "sort_columns", "-p", segmentPath)
       val out: ByteArrayOutputStream = new ByteArrayOutputStream
       val stream: PrintStream = new PrintStream(out)
       CarbonCli.run(args, stream)
       val output: String = new String(out.toByteArray)
-      if (segmentId == 2) {
+      if (segmentId == 3 || segmentId == 2) {
         assertResult(s"Input Folder: $segmentPath\nsorted by intfield,stringfield\n")(output)
+      } else if ( segmentId == 1) {
+        assertResult(s"Input Folder: $segmentPath\nsorted by charfield,timestampfield\n")(output)
       } else {
         assertResult(s"Input Folder: $segmentPath\nunsorted\n")(output)
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 1ec208f..14aa542 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -360,6 +360,55 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists decimal_table")
   }
 
+  test("delete and insert overwrite partition") {
+    sql("""drop table if exists deleteinpartition""")
+    sql(
+      """CREATE TABLE deleteinpartition (id STRING, sales STRING)
+        | PARTITIONED BY (dtm STRING)
+        | STORED AS carbondata""".stripMargin)
+    sql(
+      s"""load data local inpath '$resourcesPath/IUD/updateinpartition.csv'
+         | into table deleteinpartition""".stripMargin)
+    sql("""delete from deleteinpartition where dtm=20200907 and id='001'""")
+    sql("""delete from deleteinpartition where dtm=20200907 and id='002'""")
+    checkAnswer(
+      sql("""select count(1), dtm from deleteinpartition group by dtm"""),
+      Seq(Row(8, "20200907"), Row(10, "20200908"))
+    )
+
+    // insert overwrite an partition which is exist.
+    // make sure the delete executed before still works.
+    sql(
+      """insert overwrite table deleteinpartition
+        | partition (dtm=20200908)
+        | select * from deleteinpartition
+        | where dtm = 20200907""".stripMargin)
+    checkAnswer(
+      sql("""select count(1), dtm from deleteinpartition group by dtm"""),
+      Seq(Row(8, "20200907"), Row(8, "20200908"))
+    )
+
+    // insert overwrite an partition which is not exist.
+    // make sure the delete executed before still works.
+    sql(
+      """insert overwrite table deleteinpartition
+        | partition (dtm=20200909)
+        | select * from deleteinpartition
+        | where dtm = 20200907""".stripMargin)
+    checkAnswer(
+      sql("""select count(1), dtm from deleteinpartition group by dtm"""),
+      Seq(Row(8, "20200907"), Row(8, "20200908"), Row(8, "20200909"))
+    )
+
+    // drop a partition. make sure the delete executed before still works.
+    sql("""alter table deleteinpartition drop partition (dtm=20200908)""")
+    checkAnswer(
+      sql("""select count(1), dtm from deleteinpartition group by dtm"""),
+      Seq(Row(8, "20200907"), Row(8, "20200909"))
+    )
+    sql("""drop table deleteinpartition""")
+  }
+
   test("[CARBONDATA-3491] Return updated/deleted rows count when execute update/delete sql") {
     sql("drop table if exists test_return_row_count")
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
index cfe3b17..11bd2e4 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/HorizontalCompactionTestCase.scala
@@ -412,7 +412,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     updateDeltaFiles = getDeltaFiles(carbonFile, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
     deletaDeltaFiles = getDeltaFiles(carbonFile, CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
     // just update once, there is no horizontal compaction at this time
-    assert(updateDeltaFiles.length == 1)
+    assert(updateDeltaFiles.length == 0)
     assert(deletaDeltaFiles.length == 1)
 
     sql("""update dest10 set (c1, c3) = ('update_a', 'update_aa') where c2 = 5 or c2 = 8""").collect()
@@ -423,7 +423,7 @@ class HorizontalCompactionTestCase extends QueryTest with BeforeAndAfterAll {
     // one '.carbonindex' file for update operation this time
     // one '.carbonindex' file for horizontal compaction
     // so there must be three '.carbonindex' files and three '.deletedelta' files
-    assert(updateDeltaFiles.length == 3)
+    assert(updateDeltaFiles.length == 0)
     assert(deletaDeltaFiles.length == 3)
 
     sql("""drop table dest10""")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index cdaf2e8..28af323 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -16,9 +16,11 @@
  */
 package org.apache.carbondata.spark.testsuite.iud
 
-import java.io.File
+import java.io.{File, IOException}
 
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
+import mockit.{Mock, MockUp}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode, SparkSession}
+import org.apache.spark.sql.execution.command.mutation.{HorizontalCompaction, HorizontalCompactionException}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -26,6 +28,8 @@ import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -81,10 +85,70 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), "zerorows")(sqlContext.sparkSession)
     val segmentFileLocation = FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(
       carbonTable.getTablePath))
-    assert(segmentFileLocation.listFiles().length == 1)
+    assert(segmentFileLocation.listFiles().length == 3)
     sql("""drop table iud.zerorows""")
   }
 
+  test("update and insert overwrite partition") {
+    sql("""drop table if exists iud.updateinpartition""")
+    sql(
+      """CREATE TABLE iud.updateinpartition (id STRING, sales INT)
+        | PARTITIONED BY (dtm STRING)
+        | STORED AS carbondata""".stripMargin)
+    sql(
+      s"""load data local
+         | inpath '$resourcesPath/IUD/updateinpartition.csv'
+         | into table updateinpartition""".stripMargin)
+    sql(
+      """update iud.updateinpartition u
+        | set (u.sales) = (u.sales + 1) where id='001'""".stripMargin)
+    sql(
+      """update iud.updateinpartition u
+        | set (u.sales) = (u.sales + 2) where id='011'""".stripMargin)
+
+    // delete data from a partition, make sure the update executed before still works.
+    sql("""delete from updateinpartition where dtm=20200908 and id='012'""".stripMargin)
+    checkAnswer(
+      sql("""select sales from iud.updateinpartition where id='001'""".stripMargin),
+      Seq(Row(1))
+    )
+    checkAnswer(
+      sql("""select sales from iud.updateinpartition where id='011'""".stripMargin),
+      Seq(Row(2))
+    )
+    checkAnswer(
+      sql("""select sales from iud.updateinpartition where id='012'""".stripMargin),
+      Seq()
+    )
+
+    // insert overwrite a partition. make sure the update executed before still works.
+    sql(
+      """insert overwrite table iud.updateinpartition
+        | partition (dtm=20200908)
+        | select * from iud.updateinpartition where dtm = 20200907""".stripMargin)
+    checkAnswer(
+      sql(
+        """select sales from iud.updateinpartition
+          | where dtm=20200907 and id='001'""".stripMargin), Seq(Row(1))
+    )
+    checkAnswer(
+      sql(
+        """select sales from iud.updateinpartition
+          | where dtm=20200908 and id='001'""".stripMargin), Seq(Row(1))
+    )
+    checkAnswer(
+      sql("""select sales from iud.updateinpartition where id='011'""".stripMargin),
+      Seq()
+    )
+
+    // drop a partition. make sure the update executed before still works.
+    sql("""alter table iud.updateinpartition drop partition (dtm=20200908)""")
+    checkAnswer(
+      sql("""select sales from iud.updateinpartition where id='001'""".stripMargin),
+      Seq(Row(1))
+    )
+  }
+
   test("test update operation with multiple loads and clean files operation") {
     sql("""drop table if exists iud.zerorows""").collect()
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
@@ -199,6 +263,29 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""drop table if exists iud.dest22""")
   }
 
+  test("update carbon table with stale data") {
+    sql("""drop table if exists iud.dest22""")
+    sql("""create table iud.dest22 (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""")
+
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("iud", "dest22")
+    val tableStatusFile = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    FileFactory.getCarbonFile(tableStatusFile).delete()
+
+    sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.dest22""")
+
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(1))
+    )
+    sql("""update dest22 d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").collect()
+    checkAnswer(
+      sql("""select c2 from iud.dest22 where c1='a'"""),
+      Seq(Row(2))
+    )
+    sql("""drop table if exists iud.dest22""")
+  }
+
   test("update with subquery with more than one value for key") {
     sql("drop table if exists t1")
     sql("drop table if exists t2")
@@ -575,11 +662,11 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       """update iud.show_segment d set (d.c3, d.c5 ) =
         | (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1
         | """.stripMargin).collect()
-    val after_update = sql("""show segments for table iud.show_segment""")
-    checkAnswer(
-      after_update,
-      before_update
-    )
+    val after_update = sql("""show segments for table iud.show_segment""").collect()
+    (0 to 7).map(index => {
+      assert(after_update(1).get(index).equals(before_update(0).get(index)))
+    })
+
     sql("""drop table if exists iud.show_segment""").collect()
   }
 
@@ -1091,6 +1178,42 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test update atomicity when horizontal compaction fails") {
+    sql("drop table if exists iud.zerorows")
+    sql("create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata")
+    sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows")
+    mockForTestUpdateAtomicity(new IOException("Mock IOException"))
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+      Seq(Row("a", 1, "aa", "aaa"), Row("b", 2, "bb", "bbb"),
+        Row("c", 3, "cc", "ccc"), Row("d", 4, "dd", "ddd"), Row("e", 5, "ee", "eee"))
+    )
+
+    mockForTestUpdateAtomicity(new HorizontalCompactionException(
+      "Mock HorizontalCompactionException", System.currentTimeMillis()))
+    checkAnswer(
+      sql("""select c1,c2,c3,c5 from iud.zerorows"""),
+      Seq(Row("a", 2, "aa", "aaa"), Row("b", 2, "bb", "bbb"),
+        Row("c", 3, "cc", "ccc"), Row("d", 4, "dd", "ddd"), Row("e", 5, "ee", "eee"))
+    )
+  }
+
+  def mockForTestUpdateAtomicity(exception: Exception) {
+    var mock = new MockUp[HorizontalCompaction.type]() {
+      @Mock
+      def tryHorizontalCompaction(sparkSession: SparkSession, carbonTable: CarbonTable): Unit = {
+        throw exception
+      }
+    }
+    try {
+      sql("update iud.zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'").collect()
+    } catch {
+      case ex: Exception =>
+    }
+    mock.tearDown()
+
+  }
+
   override def afterAll {
     sql("use default")
     sql("drop database  if exists iud cascade")
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 58593e7..d12b2d4 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -767,6 +767,17 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
     checkAnswer(sql("select * from target order by key"),
       Seq(Row("c", "200"), Row("d", "3"), Row("e", "100")))
+
+    // insert overwrite a partition. make sure the merge executed before still works.
+    sql(
+      """insert overwrite table target
+        | partition (value=3)
+        | select * from target where value = 100""".stripMargin)
+    checkAnswer(sql("select * from target order by key"),
+      Seq(Row("c", "200"), Row("e", "100"), Row("e", "3")))
+    sql("""alter table target drop partition (value=3)""")
+    checkAnswer(sql("select * from target order by key"),
+      Seq(Row("c", "200"), Row("e", "100")))
   }
 
   test("check the cdc ") {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVIncrementalLoadingTestcase.scala b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVIncrementalLoadingTestcase.scala
index ce7edca..8225d6d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVIncrementalLoadingTestcase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/view/rewrite/MVIncrementalLoadingTestcase.scala
@@ -155,6 +155,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
     val segmentList = new java.util.ArrayList[String]()
     segmentList.add("0")
     segmentList.add("1")
+    segmentList.add("2")
     assert(segmentList.containsAll( segmentMap.get("default.main_table")))
     df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
     assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "mv1"))
@@ -459,6 +460,7 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
     val segmentList = new java.util.ArrayList[String]()
     segmentList.add("0")
     segmentList.add("1")
+    segmentList.add("2")
     assert(segmentList.containsAll(segmentMap.get("default.main_table")))
     df = sql(s""" select a, sum(b) from main_table group by a""".stripMargin)
     assert(TestUtil.verifyMVHit(df.queryExecution.optimizedPlan, "mv1"))
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
index 951339a..b3191c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -20,7 +20,6 @@ package org.apache.carbondata.processing.merger;
 import java.io.IOException;
 import java.util.List;
 
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
@@ -51,19 +50,9 @@ public abstract class AbstractResultProcessor {
   protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
       CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel)
       throws IOException {
-    CarbonDataFileAttributes carbonDataFileAttributes;
-    if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
-      long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegment(),
-          loadModel.getTablePath());
-      // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
-      // be written in same segment. So the TaskNo should be incremented by 1 from max val.
-      long index = taskNo + 1;
-      carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
-    } else {
-      carbonDataFileAttributes =
-          new CarbonDataFileAttributes(loadModel.getTaskNo(),
-              loadModel.getFactTimeStamp());
-    }
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(loadModel.getTaskNo(),
+            loadModel.getFactTimeStamp());
     carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
   }
 
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index f22a0fc..0a2d104 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -17,7 +17,6 @@
 
 package org.apache.carbondata.processing.merger;
 
-import java.io.File;
 import java.io.IOException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
@@ -148,150 +147,6 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * Update Both Segment Update Status and Table Status for the case of IUD Delete
-   * delta compaction.
-   *
-   * @param loadsToMerge
-   * @param metaDataFilepath
-   * @param carbonLoadModel
-   * @return
-   */
-  public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
-      List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
-      CarbonLoadModel carbonLoadModel, List<Segment> segmentFilesToBeUpdated) {
-
-    boolean status = false;
-    boolean updateLockStatus = false;
-    boolean tableLockStatus = false;
-
-    String timestamp = "" + carbonLoadModel.getFactTimeStamp();
-
-    List<String> updatedDeltaFilesList = null;
-
-    // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update
-    // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with
-    // Table Status Metadata file (For Update Block Compaction) it has to update the
-    // Table Update Status Metadata File (For corresponding Delete Delta File).
-    // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in
-    // A) Table Update Status Metadata File (Block Level)
-    //      * For each blocks which is being compacted Mark 'Compacted' as the Status.
-    // B) Table Status Metadata file (Segment Level)
-    //      * loadStatus won't be changed to "compacted'
-    //      * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current
-    //        timestamp (which is being passed from driver)
-    // First the Table Update Status Metadata File should be updated as we need to get
-    // the updated blocks for the segment from Table Status Metadata Update Delta Start and
-    // End Timestamp.
-
-    // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier identifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
-
-    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
-    ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
-
-    // Update the Compacted Blocks with Compacted Status.
-    try {
-      updatedDeltaFilesList = segmentUpdateStatusManager
-          .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName());
-    } catch (Exception e) {
-      LOGGER.error("Error while getting the Update Delta Blocks.");
-      status = false;
-      return status;
-    }
-
-    if (updatedDeltaFilesList.size() > 0) {
-      try {
-        updateLockStatus = updateLock.lockWithRetries();
-        tableLockStatus = statusLock.lockWithRetries();
-
-        List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size());
-
-        for (String compactedBlocks : updatedDeltaFilesList) {
-          // Try to BlockName
-          int endIndex = compactedBlocks.lastIndexOf(File.separator);
-          String blkNoExt =
-              compactedBlocks.substring(endIndex + 1, compactedBlocks.lastIndexOf("-"));
-          blockNames.add(blkNoExt);
-        }
-
-        if (updateLockStatus && tableLockStatus) {
-
-          SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager
-              .readLoadMetadata();
-
-          for (String compactedBlocks : blockNames) {
-            // Check is the compactedBlocks name matches with oldDetails
-            for (int i = 0; i < updateLists.length; i++) {
-              if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks)
-                  && updateLists[i].getSegmentStatus() != SegmentStatus.COMPACTED
-                  && updateLists[i].getSegmentStatus() != SegmentStatus.MARKED_FOR_DELETE) {
-                updateLists[i].setSegmentStatus(SegmentStatus.COMPACTED);
-              }
-            }
-          }
-
-          LoadMetadataDetails[] loadDetails =
-              SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-          for (LoadMetadataDetails loadDetail : loadDetails) {
-            if (loadsToMerge.contains(loadDetail)) {
-              loadDetail.setUpdateDeltaStartTimestamp(timestamp);
-              loadDetail.setUpdateDeltaEndTimestamp(timestamp);
-              if (loadDetail.getLoadName().equalsIgnoreCase("0")) {
-                loadDetail
-                    .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
-              }
-              // Update segement file name to status file
-              int segmentFileIndex = segmentFilesToBeUpdated
-                  .indexOf(Segment.toSegment(loadDetail.getLoadName(), null));
-              if (segmentFileIndex > -1) {
-                loadDetail.setSegmentFile(
-                    segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
-              }
-            }
-          }
-
-          segmentUpdateStatusManager.writeLoadDetailsIntoFile(
-              Arrays.asList(updateLists), timestamp);
-          SegmentStatusManager.writeLoadDetailsIntoFile(
-              CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails);
-          status = true;
-        } else {
-          LOGGER.error("Not able to acquire the lock.");
-          status = false;
-        }
-      } catch (IOException e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is " +
-            CarbonTablePath.getMetadataPath(identifier.getTablePath()));
-        status = false;
-
-      } finally {
-        if (updateLockStatus) {
-          if (updateLock.unlock()) {
-            LOGGER.info("Unlock the segment update lock successfully.");
-          } else {
-            LOGGER.error("Not able to unlock the segment update lock.");
-          }
-        }
-        if (tableLockStatus) {
-          if (statusLock.unlock()) {
-            LOGGER.info("Unlock the table status lock successfully.");
-          } else {
-            LOGGER.error("Not able to unlock the table status lock.");
-          }
-        }
-      }
-    }
-    return status;
-  }
-
-  /**
    * method to update table status in case of IUD Update Delta Compaction.
    * @param loadsToMerge
    * @param metaDataFilepath
@@ -439,12 +294,6 @@ public final class CarbonDataMergerUtil {
               new LinkedHashSet<>(customSegmentIds));
     }
 
-    // Check for segments which are qualified for IUD compaction.
-    if (CompactionType.IUD_UPDDEL_DELTA == compactionType) {
-
-      return identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
-    }
-
     // check preserve property and preserve the configured number of latest loads.
 
     List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
@@ -989,34 +838,6 @@ public final class CarbonDataMergerUtil {
 
   }
 
-  /**
-   * method to identify the segments qualified for merging in case of IUD Compaction.
-   *
-   * @param segments
-   * @param carbonLoadModel
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(
-      List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) {
-
-    List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size());
-
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    int numberUpdateDeltaFilesThreshold =
-        CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-    for (LoadMetadataDetails seg : segments) {
-      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(
-          new Segment(seg.getLoadName(), seg.getSegmentFile()),
-          absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
-          numberUpdateDeltaFilesThreshold)) {
-        validSegments.add(seg);
-      }
-    }
-    return validSegments;
-  }
-
   private static boolean isSegmentValid(LoadMetadataDetails seg) {
     return seg.getSegmentStatus() == SegmentStatus.SUCCESS ||
         seg.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS ||
@@ -1036,96 +857,17 @@ public final class CarbonDataMergerUtil {
 
     List<String> validSegments = new ArrayList<>();
 
-    if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
-      int numberDeleteDeltaFilesThreshold =
-          CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      for (Segment seg : segments) {
-        List<String> segmentNoAndBlocks = checkDeleteDeltaFilesInSeg(seg,
-            segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold);
-        validSegments.addAll(segmentNoAndBlocks);
-      }
-    } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
-      int numberUpdateDeltaFilesThreshold =
-          CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-      for (Segment seg : segments) {
-        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
-            numberUpdateDeltaFilesThreshold)) {
-          validSegments.add(seg.getSegmentNo());
-        }
-      }
+    int numberDeleteDeltaFilesThreshold =
+        CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+    for (Segment seg : segments) {
+      List<String> segmentNoAndBlocks = checkDeleteDeltaFilesInSeg(seg,
+          segmentUpdateStatusManager, numberDeleteDeltaFilesThreshold);
+      validSegments.addAll(segmentNoAndBlocks);
     }
     return validSegments;
   }
 
   /**
-   * Check if the blockname of the segment belongs to the Valid Update Delta List or not.
-   * @param seg
-   * @param blkName
-   * @param segmentUpdateStatusManager
-   * @return
-   */
-  public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName,
-      SegmentUpdateStatusManager segmentUpdateStatusManager) {
-
-    List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg);
-
-    String[] FileParts = blkName.split(CarbonCommonConstants.FILE_SEPARATOR);
-    String blockName = FileParts[FileParts.length - 1];
-
-    for (String str : list) {
-      if (str.contains(blockName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * This method traverses Update Delta Files inside the seg and return true
-   * if UpdateDelta Files are more than IUD Compaction threshold.
-   */
-  private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
-      AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager,
-      int numberDeltaFilesThreshold) {
-
-    CarbonFile[] updateDeltaFiles = null;
-    Set<String> uniqueBlocks = new HashSet<String>();
-
-    String segmentPath = CarbonTablePath.getSegmentPath(
-        identifier.getTablePath(), seg.getSegmentNo());
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath);
-    CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-    updateDeltaFiles = segmentUpdateStatusManager
-        .getUpdateDeltaFilesForSegment(seg.getSegmentNo(), true,
-            CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, false, allSegmentFiles);
-
-    if (updateDeltaFiles == null) {
-      return false;
-    }
-
-    // The update Delta files may have Spill over blocks. Will consider multiple spill over
-    // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which
-    // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate
-    // Spill Over Blocks will choose files with unique taskID.
-    for (CarbonFile blocks : updateDeltaFiles) {
-      // Get Task ID and the Timestamp from the Block name for e.g.
-      // part-0-3-1481084721319.carbondata => "3-1481084721319"
-      String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
-      String timestamp =
-          CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
-      String taskAndTimeStamp = task + "-" + timestamp;
-      uniqueBlocks.add(taskAndTimeStamp);
-    }
-    if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
    * Check whether the segment passed qualifies for IUD delete delta compaction or not,
    * i.e., if the number of delete delta files present in the segment is more than
    * numberDeltaFilesThreshold, this segment will be selected.
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 26cfe24..e6b6df7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1231,8 +1231,22 @@ public final class CarbonLoaderUtil {
    * Update specified segment status for load to MarkedForDelete in case of failure
    */
   public static void updateTableStatusInCaseOfFailure(String loadName,
+      CarbonTable carbonTable, SegmentStatus status) throws IOException {
+    updateTableStatusInCaseOfFailure(loadName,
+        carbonTable.getAbsoluteTableIdentifier(),
+        carbonTable.getTableName(),
+        carbonTable.getDatabaseName(),
+        carbonTable.getTablePath(),
+        carbonTable.getMetadataPath(),
+        status);
+  }
+
+  /**
+   * Update specified segment status for load to MarkedForDelete in case of failure
+   */
+  public static void updateTableStatusInCaseOfFailure(String loadName,
       AbsoluteTableIdentifier absoluteTableIdentifier, String tableName, String databaseName,
-      String tablePath, String metaDataPath) throws IOException {
+      String tablePath, String metaDataPath, SegmentStatus status) throws IOException {
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
     try {
@@ -1243,10 +1257,11 @@ public final class CarbonLoaderUtil {
             SegmentStatusManager.readLoadMetadata(metaDataPath);
         boolean ifTableStatusUpdateRequired = false;
         for (LoadMetadataDetails loadMetadataDetail : loadMetadataDetails) {
-          if (loadMetadataDetail.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS && loadName
+          if (loadMetadataDetail.getSegmentStatus() == status && loadName
               .equalsIgnoreCase(loadMetadataDetail.getLoadName())) {
             loadMetadataDetail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
             ifTableStatusUpdateRequired = true;
+            break;
           }
         }
         if (ifTableStatusUpdateRequired) {