You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2021/04/22 09:55:28 UTC

[carbondata] branch master updated: [CARBONDATA-4037] Improve the table status and segment file writing

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 71910fb  [CARBONDATA-4037] Improve the table status and segment file writing
71910fb is described below

commit 71910fba7929353b2ee9b6582b1fb39b44abe137
Author: ShreelekhyaG <sh...@yahoo.com>
AuthorDate: Fri Oct 16 23:33:15 2020 +0530

    [CARBONDATA-4037] Improve the table status and segment file writing
    
    Why is this PR needed?
    Currently, we update table status and segment files multiple times for a
    single iud/merge/compact operation and delete the index files immediately after
    merge. When concurrent queries are run, there may be situations like user query
    is trying to access the segment index files and they are not present, which is
    availability issue.
    
    What changes were proposed in this PR?
    1. Generate segment file after merge index and update table status at beginning
       and after merge index. If mergeindex/ table status update fails , load will also fail.
       order:
       create table status file => index files => merge index => generate segment file => update table status
     * Same order is now maintained for SI, compaction, IUD, addHivePartition, addSegment scenarios.
     * Whenever segment file needs to be updated for main table, a new segment file is created
       instead of updating existing one.
    2. When compact 'segment_index' is triggered,
       For new tables - if no index files to merge, then logs warn message and exits.
       For old tables - index files not deleted.
    3. After SI small files merge,
       For newly loaded SI segments - DeleteOldIndexOrMergeFiles deletes immediately after merge.
       For segments that are already present (rebuild) - old index files and data files are not deleted.
    4. Removed carbon.merge.index.in.segment property from config-parameters. This property
       to be used for debugging/test purposes.
    
    Note: Cleaning of stale index/segment files to be handled in - CARBONDATA -4074
    
    This closes #3988
---
 .../core/constants/CarbonCommonConstants.java      |  12 -
 .../carbondata/core/index/IndexStoreManager.java   |  20 +-
 .../blockletindex/SegmentIndexFileStore.java       |  16 --
 .../carbondata/core/metadata/SegmentFileStore.java | 217 +++++++++++------
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  14 +-
 .../LatestFilesReadCommittedScope.java             |  31 ++-
 .../TableStatusReadCommittedScope.java             |  38 ++-
 .../carbondata/core/util/CarbonTestUtil.java       |  40 +++
 .../apache/carbondata/core/util/CarbonUtil.java    |  18 +-
 .../core/writer/CarbonIndexFileMergeWriter.java    | 142 ++++++-----
 docs/configuration-parameters.md                   |   1 -
 .../hadoop/api/CarbonOutputCommitter.java          |   7 +-
 .../CarbonDataFileMergeTestCaseOnSI.scala          |  16 +-
 .../CarbonIndexFileMergeTestCaseWithSI.scala       | 116 ++++-----
 .../hive/MapredCarbonOutputCommitter.java          |  10 +-
 .../PrestoInsertIntoTableTestCase.scala            |   6 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |  14 +-
 .../spark/rdd/CarbonTableCompactor.scala           |   3 +-
 .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala |  38 ++-
 .../spark/sql/events/MergeIndexEventListener.scala |  37 ++-
 .../command/management/CarbonAddLoadCommand.scala  |  25 +-
 .../CarbonAlterTableAddHivePartitionCommand.scala  | 143 ++++++-----
 .../AlterTableMergeIndexSIEventListener.scala      |  21 +-
 .../events/SILoadEventListener.scala               |   4 -
 .../SILoadEventListenerForFailedSegments.scala     |   5 -
 .../spark/sql/secondaryindex/load/Compactor.scala  |  17 +-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala |  51 ++--
 .../secondaryindex/util/SecondaryIndexUtil.scala   | 146 +++++------
 .../CarbonIndexFileMergeTestCase.scala             | 267 +++++++++++++--------
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  |  10 +-
 .../merger/CompactionResultSortProcessor.java      |   9 +-
 .../merger/RowResultMergerProcessor.java           |   8 +-
 .../processing/util/CarbonLoaderUtil.java          |  22 +-
 33 files changed, 891 insertions(+), 633 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 9840540..7598dea9 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -372,18 +372,6 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT = "true";
 
   /**
-   * It is the user defined property to specify whether to throw exception or not in case
-   * if the MERGE INDEX JOB is failed. Default value - TRUE
-   * TRUE - throws exception and fails the corresponding LOAD job
-   * FALSE - Logs the exception and continue with the LOAD
-   */
-  @CarbonProperty
-  public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION =
-      "carbon.merge.index.failure.throw.exception";
-
-  public static final String CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT = "true";
-
-  /**
    * property to be used for specifying the max byte limit for string/varchar data type till
    * where storing min/max in data file will be considered
    */
diff --git a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
index 321a774..760a3d0 100644
--- a/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/index/IndexStoreManager.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.common.exceptions.sql.MalformedIndexCommandExceptio
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentPropertiesAndSchemaHolder;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.dev.IndexFactory;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
@@ -44,7 +43,6 @@ import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang.StringUtils;
@@ -507,16 +505,16 @@ public final class IndexStoreManager {
         UpdateVO updateVO =
             SegmentUpdateStatusManager.getInvalidTimestampRange(segment.getLoadMetadataDetails());
         SegmentRefreshInfo segmentRefreshInfo;
+        String segmentFileName = segment.getSegmentFileName();
         if ((updateVO != null && updateVO.getLatestUpdateTimestamp() != null)
-            || segment.getSegmentFileName() != null) {
-          long segmentFileTimeStamp;
-          if (null != segment.getLoadMetadataDetails()) {
-            segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime();
-          } else {
-            segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
-                .getSegmentFilePath(table.getTablePath(), segment.getSegmentFileName()))
-                .getLastModifiedTime();
-          }
+            || segmentFileName != null) {
+          // Do not use getLastModifiedTime API on segment file carbon file object as it will
+          // slow down operation in Object stores like S3. Now the segment file is always written
+          // for operations which was overwriting earlier, so this timestamp can be checked always
+          // to check whether to refresh the cache or not.
+          long segmentFileTimeStamp = Long.parseLong(segmentFileName
+              .substring(segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+                  segmentFileName.lastIndexOf(CarbonCommonConstants.POINT)));
           segmentRefreshInfo =
               new SegmentRefreshInfo(updateVO.getLatestUpdateTimestamp(), 0, segmentFileTimeStamp);
         } else {
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index a82ba8e..60c8981 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -338,22 +338,6 @@ public class SegmentIndexFileStore {
    * List all the index files of the segment.
    *
    * @param carbonFile directory
-   * @return
-   */
-  public static CarbonFile[] getCarbonIndexFiles(CarbonFile carbonFile) {
-    return carbonFile.listFiles(new CarbonFileFilter() {
-      @Override
-      public boolean accept(CarbonFile file) {
-        return ((file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
-            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) && file.getSize() > 0);
-      }
-    });
-  }
-
-  /**
-   * List all the index files of the segment.
-   *
-   * @param carbonFile directory
    */
   public static void getCarbonIndexFilesRecursively(CarbonFile carbonFile,
       List<CarbonFile> indexFiles) {
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 50ba335..0bd517c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -37,6 +37,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
@@ -69,7 +70,6 @@ import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.TrashUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -103,9 +103,10 @@ public class SegmentFileStore {
    * Write segment information to the segment folder with index file name and
    * corresponding partitions.
    */
-  public static void writeSegmentFile(String tablePath, final String taskNo, String location,
-      String timeStamp, List<String> partitionNames) throws IOException {
-    writeSegmentFile(tablePath, taskNo, location, timeStamp, partitionNames, false);
+  public static void writeSegmentFileForPartitionTable(String tablePath, final String taskNo,
+      String location, String timeStamp, List<String> partitionNames) throws IOException {
+    writeSegmentFileForPartitionTable(tablePath, taskNo, location, timeStamp, partitionNames,
+        false);
   }
 
   /**
@@ -162,8 +163,9 @@ public class SegmentFileStore {
    * Write segment information to the segment folder with index file name and
    * corresponding partitions.
    */
-  public static void writeSegmentFile(String tablePath, final String taskNo, String location,
-      String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow) throws IOException {
+  public static void writeSegmentFileForPartitionTable(String tablePath, final String taskNo,
+      String location, String timeStamp, List<String> partitionNames, boolean isMergeIndexFlow)
+      throws IOException {
     String tempFolderLoc = timeStamp + ".tmp";
     String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
     CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
@@ -172,8 +174,12 @@ public class SegmentFileStore {
     }
     CarbonFile tempFolder;
     if (isMergeIndexFlow) {
+      // To get the index files from the partition path directly.
+      // For old stores after merge index is triggered, we wont have '.tmp' folder.
       tempFolder = FileFactory.getCarbonFile(location);
     } else {
+      // When mergeindex property is disabled during compaction, we write temporary segment files
+      // with index files present in '.tmp' folder of partition path.
       tempFolder = FileFactory
           .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
     }
@@ -199,13 +205,7 @@ public class SegmentFileStore {
         folderDetails.setRelative(isRelative);
         folderDetails.setPartitions(partitionNames);
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
-        for (CarbonFile file : carbonFiles) {
-          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-            folderDetails.setMergeFileName(file.getName());
-          } else {
-            folderDetails.getFiles().add(file.getName());
-          }
-        }
+        setIndexFileNamesToFolderDetails(folderDetails, carbonFiles);
         segmentFile.addPath(location, folderDetails);
         String path = null;
         if (isMergeIndexFlow) {
@@ -283,13 +283,7 @@ public class SegmentFileStore {
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
       folderDetails.setRelative(false);
       segmentFile.addPath(segment.getSegmentPath(), folderDetails);
-      for (CarbonFile file : indexFiles) {
-        if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-          folderDetails.setMergeFileName(file.getName());
-        } else {
-          folderDetails.getFiles().add(file.getName());
-        }
-      }
+      setIndexFileNamesToFolderDetails(folderDetails, indexFiles);
       String segmentFileFolder = CarbonTablePath.getSegmentFilesLocation(tablePath);
       CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
       if (!carbonFile.exists()) {
@@ -304,6 +298,17 @@ public class SegmentFileStore {
     return false;
   }
 
+  public static void setIndexFileNamesToFolderDetails(FolderDetails folderDetails,
+      CarbonFile[] indexFiles) {
+    for (CarbonFile file : indexFiles) {
+      if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        folderDetails.setMergeFileName(file.getName());
+      } else {
+        folderDetails.getFiles().add(file.getName());
+      }
+    }
+  }
+
   public static boolean writeSegmentFileForOthers(
       CarbonTable carbonTable,
       Segment segment,
@@ -350,21 +355,6 @@ public class SegmentFileStore {
     return false;
   }
 
-  public static void mergeIndexAndWriteSegmentFile(CarbonTable carbonTable, String segmentId,
-      String UUID) {
-    String tablePath = carbonTable.getTablePath();
-    String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
-    try {
-      SegmentFileStore sfs = new SegmentFileStore(tablePath, segmentFileName);
-      List<CarbonFile> carbonIndexFiles = sfs.getIndexCarbonFiles();
-      new CarbonIndexFileMergeWriter(carbonTable)
-          .writeMergeIndexFileBasedOnSegmentFile(segmentId, null, sfs,
-              carbonIndexFiles.toArray(new CarbonFile[carbonIndexFiles.size()]), UUID, null);
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-  }
-
   /**
    * Write segment file to the metadata folder of the table selecting only the current load files
    *
@@ -401,13 +391,7 @@ public class SegmentFileStore {
       FolderDetails folderDetails = new FolderDetails();
       folderDetails.setRelative(absSegPath == null);
       folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
-      for (CarbonFile file : indexFiles) {
-        if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-          folderDetails.setMergeFileName(file.getName());
-        } else {
-          folderDetails.getFiles().add(file.getName());
-        }
-      }
+      setIndexFileNamesToFolderDetails(folderDetails, indexFiles);
       String segmentRelativePath = "/";
       if (!supportFlatFolder) {
         if (absSegPath != null) {
@@ -441,6 +425,107 @@ public class SegmentFileStore {
   }
 
   /**
+   * Get stale and invalid index files that have already been merged.
+   * As we are not deleting index files immediately after updating old tables,
+   * we will have old index files in segment folder.
+   * This method is called in following scenarios:
+   * 1. During read, when segment file is not present or gets deleted.
+   * 2. When writing segment index size in tablestatus file.
+   */
+  public static Set<String> getInvalidAndMergedIndexFiles(List<String> indexFiles)
+      throws IOException {
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    Set<String> mergedAndInvalidIndexFiles = new HashSet<>();
+    long lastModifiedTime = 0L;
+    String validMergeIndexFile = null;
+    List<String> mergeIndexFileNames = new ArrayList<>();
+    boolean isIndexFilesPresent = false;
+    for (String indexFile : indexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
+        isIndexFilesPresent = true;
+      }
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        long indexFileTimeStamp =
+            Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(indexFile));
+        // In case there are more than 1 mergeindex files present, latest one is considered as valid
+        if (indexFileTimeStamp > lastModifiedTime) {
+          lastModifiedTime = indexFileTimeStamp;
+          validMergeIndexFile = indexFile;
+        }
+        mergeIndexFileNames.add(indexFile);
+      }
+    }
+    //  Possible scenarios where we have >1 merge index file:
+    //  1. SI load when MT has stale index files. As during SI load MT segment file is not updated,
+    //  we directly read from segment directory.
+    //  2. In SI table, after small files index merge(refresh command) we will have more than one
+    //  mergeindex file as we are not deleting old file immediately. If segment file gets deleted,
+    //  then it reads from segment directory.
+    if (mergeIndexFileNames.size() > 1 && validMergeIndexFile != null) {
+      final String validIndexFileName = validMergeIndexFile;
+      // get the invalid mergeindex files by excluding the valid file.
+      mergedAndInvalidIndexFiles.addAll(
+          mergeIndexFileNames.stream().filter(file -> !file.equalsIgnoreCase(validIndexFileName))
+              .collect(Collectors.toSet()));
+    }
+    // If both index and merge index files are present, read the valid merge index file and add its
+    // mapped index files to the excluding list.
+    // Example: We have xxx.index and xxx.mergeindex files in a segment directory.
+    // Here the index file (xxx.index) is considered invalid.
+    if (isIndexFilesPresent && validMergeIndexFile != null) {
+      indexFileStore.readMergeFile(validMergeIndexFile);
+      Map<String, List<String>> carbonMergeFileToIndexFilesMap =
+          indexFileStore.getCarbonMergeFileToIndexFilesMap();
+      String segmentPath =
+          validMergeIndexFile.substring(0, validMergeIndexFile.lastIndexOf(File.separator) + 1);
+      mergedAndInvalidIndexFiles
+          .addAll(carbonMergeFileToIndexFilesMap.get(validMergeIndexFile).stream()
+          .map(file -> segmentPath + file).collect(Collectors.toSet()));
+    }
+    if (mergeIndexFileNames.size() == 0 && indexFiles.size() > 1) {
+      // if more than one index file present with different timestamps, then stale/invalid
+      // data is present. Latest one is considered as valid.
+      Long validFile = indexFiles.stream()
+          .map(file -> Long.parseLong(CarbonTablePath.DataFileUtil.getTimeStampFromFileName(file)))
+          .max(Long::compareTo).get();
+      mergedAndInvalidIndexFiles.addAll(
+          indexFiles.stream().filter(file -> !file.contains(validFile.toString()))
+              .collect(Collectors.toSet()));
+    }
+    return mergedAndInvalidIndexFiles;
+  }
+
+  /**
+   * Get valid index files excluding the invalid ones.
+   * This method is called while calculating index size using segment directory.
+   */
+  public static Object getValidCarbonIndexFiles(Object carbonFiles) throws IOException {
+    // For supporting local or default file system.
+    if (carbonFiles instanceof CarbonFile[]) {
+      Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+          Arrays.stream((CarbonFile[]) carbonFiles).map(file -> file.getAbsolutePath())
+              .collect(Collectors.toList()));
+      if (!mergedAndInvalidIndexFiles.isEmpty()) {
+        return Arrays.stream((CarbonFile[]) carbonFiles)
+            .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getAbsolutePath()))
+            .toArray(CarbonFile[]::new);
+      }
+    }
+    // For supporting HDFS/S3/other file systems.
+    else if (carbonFiles instanceof FileStatus[]) {
+      Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+          Arrays.stream((FileStatus[]) carbonFiles).map(file -> file.getPath().toString())
+              .collect(Collectors.toList()));
+      if (!mergedAndInvalidIndexFiles.isEmpty()) {
+        return Arrays.stream((FileStatus[]) carbonFiles)
+            .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getPath().toString()))
+            .toArray(FileStatus[]::new);
+      }
+    }
+    return carbonFiles;
+  }
+
+  /**
    * Move the loaded data from source folder to destination folder.
    */
   private static void moveFromTempFolder(String source, String dest) {
@@ -654,13 +739,23 @@ public class SegmentFileStore {
       String location = spec.getLocation().toString();
       CarbonFile carbonFile = FileFactory.getCarbonFile(location);
 
-      CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
-        @Override
-        public boolean accept(CarbonFile file) {
-          return CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath());
-        }
-      });
+      CarbonFile[] listFiles =
+          carbonFile.listFiles(file -> CarbonTablePath.isCarbonIndexFile(file.getAbsolutePath()));
       if (listFiles != null && listFiles.length > 0) {
+        Set<String> mergedAndInvalidIndexFiles = getInvalidAndMergedIndexFiles(
+            Arrays.stream(listFiles).map(CarbonFile::getAbsolutePath).collect(Collectors.toList()));
+        // In add hive partition after merge index is written, delete index files that are merged.
+        for (CarbonFile indexFile : listFiles) {
+          if (mergedAndInvalidIndexFiles.contains(indexFile.getAbsolutePath())) {
+            indexFile.delete();
+          }
+        }
+        CarbonFile[] carbonIndexFiles = listFiles;
+        if (!mergedAndInvalidIndexFiles.isEmpty()) {
+          carbonIndexFiles = Arrays.stream(listFiles)
+              .filter(file -> !mergedAndInvalidIndexFiles.contains(file.getAbsolutePath()))
+              .toArray(CarbonFile[]::new);
+        }
         boolean isRelative = false;
         if (location.startsWith(tablePath)) {
           location = location.substring(tablePath.length());
@@ -671,13 +766,7 @@ public class SegmentFileStore {
         folderDetails.setRelative(isRelative);
         folderDetails.setPartitions(spec.getPartitions());
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
-        for (CarbonFile file : listFiles) {
-          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-            folderDetails.setMergeFileName(file.getName());
-          } else {
-            folderDetails.getFiles().add(file.getName());
-          }
-        }
+        setIndexFileNamesToFolderDetails(folderDetails, carbonIndexFiles);
         localSegmentFile.addPath(location, folderDetails);
         if (segmentFile == null) {
           segmentFile = localSegmentFile;
@@ -1336,24 +1425,6 @@ public class SegmentFileStore {
   }
 
   /**
-   * This method returns the list of index/merge index files for a segment in carbonTable.
-   */
-  public static Set<String> getIndexFilesListForSegment(Segment segment, String tablePath)
-      throws IOException {
-    Set<String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
-      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo());
-      indexFiles =
-          new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(segmentPath).keySet();
-    } else {
-      SegmentFileStore segmentFileStore =
-          new SegmentFileStore(tablePath, segment.getSegmentFileName());
-      indexFiles = segmentFileStore.getIndexAndMergeFiles().keySet();
-    }
-    return indexFiles;
-  }
-
-  /**
    * It contains the segment information like location, partitions and related index files
    */
   public static class SegmentFile implements Serializable {
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 41c4b2b..c02af10 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -290,6 +290,8 @@ public class CarbonUpdateUtil {
 
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+        // to update table status only when required.
+        boolean isUpdateRequired = false;
 
         for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
           // we are storing the link between the 2 status files in the segment 0 only.
@@ -304,6 +306,7 @@ public class CarbonUpdateUtil {
             if (segmentsToBeDeleted.contains(new Segment(loadMetadata.getLoadName()))) {
               loadMetadata.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
               loadMetadata.setModificationOrDeletionTimestamp(Long.parseLong(updatedTimeStamp));
+              isUpdateRequired = true;
             }
           }
           for (Segment segName : updatedSegmentsList) {
@@ -318,19 +321,23 @@ public class CarbonUpdateUtil {
                 }
                 // update end timestamp for each time.
                 loadMetadata.setUpdateDeltaEndTimestamp(updatedTimeStamp);
+                isUpdateRequired = true;
               }
               if (segmentFilesTobeUpdated
                   .contains(Segment.toSegment(loadMetadata.getLoadName(), null))) {
                 loadMetadata.setSegmentFile(loadMetadata.getLoadName() + "_" + updatedTimeStamp
                     + CarbonTablePath.SEGMENT_EXT);
+                isUpdateRequired = true;
               }
             }
           }
         }
 
         try {
-          SegmentStatusManager
-                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+          if (isUpdateRequired || isUpdateStatusFileUpdateRequired) {
+            SegmentStatusManager
+                .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+          }
         } catch (IOException e) {
           return false;
         }
@@ -508,7 +515,8 @@ public class CarbonUpdateUtil {
     int maxTime;
     try {
       maxTime = Integer.parseInt(CarbonProperties.getInstance()
-              .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
+          .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME,
+              Integer.toString(CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME)));
     } catch (NumberFormatException e) {
       maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
     }
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index c4f948a..9077707 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.core.readcommitter;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -24,6 +25,8 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
@@ -31,6 +34,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentRefreshInfo;
@@ -126,7 +130,7 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
   }
 
   @Override
-  public Map<String, String> getCommittedIndexFile(Segment segment) {
+  public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFileStore = new HashMap<>();
     Map<String, List<String>> snapShot = readCommittedIndexFileSnapShot.getSegmentIndexFileMap();
     String segName;
@@ -135,15 +139,26 @@ public class LatestFilesReadCommittedScope implements ReadCommittedScope {
     } else {
       segName = segment.getSegmentFileName();
     }
-    List<String> index = snapShot.get(segName);
-    if (null == index) {
-      index = new LinkedList<>();
+    List<String> indexFiles = snapShot.get(segName);
+    if (null == indexFiles) {
+      indexFiles = new LinkedList<>();
     }
-    for (String indexPath : index) {
-      if (indexPath.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
-        indexFileStore.put(indexPath, indexPath.substring(indexPath.lastIndexOf('/') + 1));
+    Set<String> mergedIndexFiles =
+        SegmentFileStore.getInvalidAndMergedIndexFiles(indexFiles);
+    List<String> filteredIndexFiles = indexFiles;
+    if (!mergedIndexFiles.isEmpty()) {
+      // do not include already merged index files details.
+      filteredIndexFiles = indexFiles.stream().filter(
+          file -> !mergedIndexFiles.contains(file))
+          .collect(Collectors.toList());
+    }
+    for (String indexFile : filteredIndexFiles) {
+      if (indexFile.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+        indexFileStore
+            .put(indexFile, indexFile.substring(indexFile.lastIndexOf(File.separator) + 1));
+        break;
       } else {
-        indexFileStore.put(indexPath, null);
+        indexFileStore.put(indexFile, null);
       }
     }
     return indexFileStore;
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
index cd3992b..2c9ab84 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/TableStatusReadCommittedScope.java
@@ -18,11 +18,14 @@
 package org.apache.carbondata.core.readcommitter;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.carbondata.common.annotations.InterfaceStability;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -79,13 +82,25 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
   @Override
   public Map<String, String> getCommittedIndexFile(Segment segment) throws IOException {
     Map<String, String> indexFiles;
-    if (segment.getSegmentFileName() == null) {
+    SegmentFileStore fileStore = null;
+    if (segment.getSegmentFileName() != null) {
+      fileStore = new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
+    }
+    if (segment.getSegmentFileName() == null || fileStore.getSegmentFile() == null) {
       String path =
           CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo());
       indexFiles = new SegmentIndexFileStore().getMergeOrIndexFilesFromSegment(path);
+      Set<String> mergedIndexFiles =
+          SegmentFileStore.getInvalidAndMergedIndexFiles(new ArrayList<>(indexFiles.keySet()));
+      Map<String, String> filteredIndexFiles = indexFiles;
+      if (mergedIndexFiles.size() > 0) {
+        // do not include already merged index files details.
+        filteredIndexFiles = indexFiles.entrySet().stream()
+            .filter(indexFile -> !mergedIndexFiles.contains(indexFile.getKey()))
+            .collect(HashMap::new, (m, v) -> m.put(v.getKey(), v.getValue()), HashMap::putAll);
+      }
+      return filteredIndexFiles;
     } else {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(identifier.getTablePath(), segment.getSegmentFileName());
       indexFiles = fileStore.getIndexAndMergeFiles();
       if (fileStore.getSegmentFile() != null) {
         segment.setSegmentMetaDataInfo(fileStore.getSegmentFile().getSegmentMetaDataInfo());
@@ -97,12 +112,15 @@ public class TableStatusReadCommittedScope implements ReadCommittedScope {
   public SegmentRefreshInfo getCommittedSegmentRefreshInfo(Segment segment, UpdateVO updateVo) {
     SegmentRefreshInfo segmentRefreshInfo;
     long segmentFileTimeStamp = 0L;
-    if (null != segment.getLoadMetadataDetails()) {
-      segmentFileTimeStamp = segment.getLoadMetadataDetails().getLastModifiedTime();
-    } else if (null != segment.getSegmentFileName()) {
-      segmentFileTimeStamp = FileFactory.getCarbonFile(CarbonTablePath
-          .getSegmentFilePath(identifier.getTablePath(), segment.getSegmentFileName()))
-          .getLastModifiedTime();
+    String segmentFileName = segment.getSegmentFileName();
+    if (null != segmentFileName) {
+      // Do not use getLastModifiedTime API on segment file carbon file object as it will slow down
+      // operation in Object stores like S3. Now the segment file is always written for operations
+      // which was overwriting earlier, so this timestamp can be checked always to check whether
+      // to refresh the cache or not
+      segmentFileTimeStamp = Long.parseLong(segmentFileName
+          .substring(segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE) + 1,
+              segmentFileName.lastIndexOf(CarbonCommonConstants.POINT)));
     }
     if (updateVo != null) {
       segmentRefreshInfo =
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
index 822c7fa..25bbc0b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonTestUtil.java
@@ -23,6 +23,7 @@ import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.BitSet;
 import java.util.HashMap;
 import java.util.List;
@@ -43,10 +44,14 @@ import org.apache.carbondata.core.datastore.page.LazyColumnPage;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageDecoder;
 import org.apache.carbondata.core.datastore.page.encoding.ColumnPageEncoderMeta;
 import org.apache.carbondata.core.datastore.page.encoding.DefaultEncodingFactory;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.LocalDictionaryChunk;
 
 public class CarbonTestUtil {
@@ -161,6 +166,41 @@ public class CarbonTestUtil {
     return isLocalDictionaryGenerated;
   }
 
+  public static int getSegmentFileCount(String tableName) throws IOException {
+    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableName);
+    CarbonFile segmentsFolder = FileFactory
+        .getCarbonFile(CarbonTablePath.getSegmentFilesLocation(carbonTable.getTablePath()));
+    assert (segmentsFolder.isFileExist());
+    return segmentsFolder.listFiles(true).size();
+  }
+
+  public static int getIndexFileCount(String tableName, String segment) throws IOException {
+    return getIndexFileCount(tableName, segment, null);
+  }
+
+  public static int getIndexFileCount(String tableName,
+      String segment, String extension) throws IOException {
+    if (extension == null) {
+      extension = CarbonTablePath.INDEX_FILE_EXT;
+    }
+    CarbonTable table = CarbonMetadata.getInstance().getCarbonTable(tableName);
+    String path = CarbonTablePath
+        .getSegmentPath(table.getAbsoluteTableIdentifier().getTablePath(), segment);
+    boolean recursive = false;
+    if (table.isHivePartitionTable()) {
+      path = table.getAbsoluteTableIdentifier().getTablePath();
+      recursive = true;
+    }
+    List<CarbonFile> carbonFiles = FileFactory.getCarbonFile(path).listFiles(recursive,
+        file -> file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+            .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
+    CarbonFile[] validIndexFiles = (CarbonFile[]) SegmentFileStore
+        .getValidCarbonIndexFiles(carbonFiles.toArray(new CarbonFile[carbonFiles.size()]));
+    String finalExtension = extension;
+    return Arrays.stream(validIndexFiles).filter(file -> file.getName().endsWith(finalExtension))
+        .toArray().length;
+  }
+
   public static void copy(String oldLoc, String newLoc) throws IOException {
     CarbonFile oldFolder = FileFactory.getCarbonFile(oldLoc);
     FileFactory.mkdirs(newLoc, FileFactory.getConfiguration());
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index a5e6a12..7b762ba 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -114,7 +114,6 @@ import org.apache.carbondata.format.IndexHeader;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import org.apache.commons.codec.binary.Base64;
-import org.apache.commons.io.FileUtils;
 import org.apache.commons.io.input.ClassLoaderObjectInputStream;
 import org.apache.commons.lang.ArrayUtils;
 import org.apache.commons.lang.StringUtils;
@@ -2476,7 +2475,9 @@ public final class CarbonUtil {
         if (fs.exists(path)) {
           FileStatus[] fileStatuses = fs.listStatus(path);
           if (null != fileStatuses) {
-            for (FileStatus dataAndIndexStatus : fileStatuses) {
+            FileStatus[] validDataAndIndexStatus =
+                (FileStatus[]) SegmentFileStore.getValidCarbonIndexFiles(fileStatuses);
+            for (FileStatus dataAndIndexStatus : validDataAndIndexStatus) {
               String pathName = dataAndIndexStatus.getPath().getName();
               if (pathName.endsWith(CarbonTablePath.getCarbonIndexExtension()) || pathName
                   .endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
@@ -2491,17 +2492,18 @@ public final class CarbonUtil {
       case LOCAL:
       default:
         segmentPath = FileFactory.getUpdatedFilePath(segmentPath);
-        File file = new File(segmentPath);
-        File[] segmentFiles = file.listFiles();
-        if (null != segmentFiles) {
-          for (File dataAndIndexFile : segmentFiles) {
+        CarbonFile[] dataAndIndexFiles = FileFactory.getCarbonFile(segmentPath).listFiles();
+        if (null != dataAndIndexFiles) {
+          CarbonFile[] validDataAndIndexFiles =
+              (CarbonFile[]) SegmentFileStore.getValidCarbonIndexFiles(dataAndIndexFiles);
+          for (CarbonFile dataAndIndexFile : validDataAndIndexFiles) {
             if (dataAndIndexFile.getCanonicalPath()
                 .endsWith(CarbonTablePath.getCarbonIndexExtension()) || dataAndIndexFile
                 .getCanonicalPath().endsWith(CarbonTablePath.getCarbonMergeIndexExtension())) {
-              carbonIndexSize += FileUtils.sizeOf(dataAndIndexFile);
+              carbonIndexSize += dataAndIndexFile.getSize();
             } else if (dataAndIndexFile.getCanonicalPath()
                 .endsWith(CarbonTablePath.getCarbonDataExtension())) {
-              carbonDataSize += FileUtils.sizeOf(dataAndIndexFile);
+              carbonDataSize += dataAndIndexFile.getSize();
             }
           }
         }
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 3f26aa2..a619f15 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -36,7 +36,9 @@ import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
@@ -70,18 +72,18 @@ public class CarbonIndexFileMergeWriter {
    * @param tablePath
    * @param indexFileNamesTobeAdded while merging, it considers only these files.
    *                                If null, then consider all
-   * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
+   * @param isOldStoreIndexFilesPresent flag to read file footer information from carbondata
    *                                         file. This will used in case of upgrade from version
    *                                         which do not store the blocklet info to current version
    * @throws IOException
    */
-  private String mergeCarbonIndexFilesOfSegment(String segmentId,
-      String tablePath, List<String> indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String uuid, String partitionPath) {
+  private String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath,
+      List<String> indexFileNamesTobeAdded, boolean isOldStoreIndexFilesPresent, String uuid,
+      String partitionPath) {
+    Segment segment = Segment.getSegment(segmentId, tablePath);
+    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
     try {
-      Segment segment = Segment.getSegment(segmentId, tablePath);
-      String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
-      CarbonFile[] indexFiles;
+      List<CarbonFile> indexFiles = new ArrayList<>();
       SegmentFileStore sfs = null;
       if (segment != null && segment.getSegmentFileName() != null) {
         sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
@@ -95,28 +97,28 @@ public class CarbonIndexFileMergeWriter {
               indexFilesInPartition.add(indexCarbonFile);
             }
           }
-          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+          indexFiles = indexFilesInPartition;
         } else {
-          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+          indexFiles = indexCarbonFiles;
         }
-      } else {
-        indexFiles = SegmentIndexFileStore
-            .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
       }
-      if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
-        if (sfs == null) {
-          return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
-              readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId, uuid);
-        } else {
-          return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
-              indexFiles, uuid, partitionPath);
+      if (sfs == null || indexFiles.isEmpty()) {
+        if (table.isHivePartitionTable()) {
+          segmentPath = partitionPath;
         }
+        return writeMergeIndexFileBasedOnSegmentFolder(indexFileNamesTobeAdded,
+            isOldStoreIndexFilesPresent, segmentPath, segmentId, uuid, true);
+      } else {
+        return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded,
+            isOldStoreIndexFilesPresent, sfs,
+            indexFiles.toArray(new CarbonFile[0]), uuid, partitionPath);
       }
     } catch (Exception e) {
-      LOGGER.error(
-          "Failed to merge index files in path: " + tablePath, e);
+      String message =
+          "Failed to merge index files in path: " + segmentPath + ". " + e.getMessage();
+      LOGGER.error(message);
+      throw new RuntimeException(message, e);
     }
-    return null;
   }
 
   /**
@@ -184,30 +186,49 @@ public class CarbonIndexFileMergeWriter {
     return indexLocationMap;
   }
 
-  private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles,
-      String segmentId, String uuid) throws IOException {
+  public String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
+      boolean isOldStoreIndexFilesPresent, String segmentPath,
+      String segmentId, String uuid, boolean readBasedOnUUID) throws IOException {
+    CarbonFile[] indexFiles = null;
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-    if (readFileFooterFromCarbonDataFile) {
+    if (isOldStoreIndexFilesPresent) {
       // this case will be used in case of upgrade where old store will not have the blocklet
       // info in the index file and therefore blocklet info need to be read from the file footer
       // in the carbondata file
-      fileStore.readAllIndexAndFillBlockletInfo(segmentPath, uuid);
+      fileStore.readAllIndexAndFillBlockletInfo(segmentPath, null);
     } else {
-      fileStore.readAllIIndexOfSegment(segmentPath, uuid);
+      if (readBasedOnUUID) {
+        indexFiles = SegmentIndexFileStore
+            .getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration(), uuid);
+        fileStore.readAllIIndexOfSegment(segmentPath, uuid);
+      } else {
+        // The uuid can be different, when we add load from external path.
+        indexFiles =
+            SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
+        fileStore.readAllIIndexOfSegment(segmentPath);
+      }
     }
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
-    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId, uuid);
-    for (CarbonFile indexFile : indexFiles) {
-      indexFile.delete();
+    Map<String, List<String>> mergeToIndexFileMap = fileStore.getCarbonMergeFileToIndexFilesMap();
+    if (!mergeToIndexFileMap.containsValue(new ArrayList<>(indexMap.keySet())))   {
+      writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap, segmentId, uuid);
+      // If Alter merge index for old tables is triggered, do not delete index files immediately
+      // to avoid index file not found during concurrent queries
+      if (!isOldStoreIndexFilesPresent && indexFiles != null) {
+        for (CarbonFile indexFile : indexFiles) {
+          indexFile.delete();
+        }
+      }
     }
     return null;
   }
 
   public String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
-      List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore,
-      CarbonFile[] indexFiles, String uuid, String partitionPath) throws IOException {
+      List<String> indexFileNamesTobeAdded, boolean isOldStoreIndexFilesPresent,
+      SegmentFileStore segmentFileStore, CarbonFile[] indexFiles, String uuid, String partitionPath)
+      throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+    String newSegmentFileName = null;
     // in case of partition table, merge index file to be created for each partition
     if (null != partitionPath) {
       for (CarbonFile indexFile : indexFiles) {
@@ -245,9 +266,10 @@ public class CarbonIndexFileMergeWriter {
         for (PartitionSpec partitionSpec : partitionSpecs) {
           if (partitionSpec.getLocation().toString().equals(partitionPath)) {
             try {
-              SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
-                  segmentId + CarbonCommonConstants.UNDERSCORE + uuid + "",
-                  partitionSpec.getPartitions(), true);
+              SegmentFileStore
+                  .writeSegmentFileForPartitionTable(table.getTablePath(), mergeIndexFile,
+                      partitionPath, segmentId + CarbonCommonConstants.UNDERSCORE + uuid + "",
+                      partitionSpec.getPartitions(), true);
             } catch (Exception ex) {
               // delete merge index file if created,
               // keep only index files as segment file writing is failed
@@ -260,12 +282,23 @@ public class CarbonIndexFileMergeWriter {
         }
       }
     }
-    String newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
-        + CarbonTablePath.SEGMENT_EXT;
+    if (table.isIndexTable()) {
+      // To maintain same segment file name mapping between parent and SI table.
+      IndexMetadata indexMetadata = table.getIndexMetadata();
+      LoadMetadataDetails[] loadDetails = SegmentStatusManager
+          .readLoadMetadata(CarbonTablePath.getMetadataPath(indexMetadata.getParentTablePath()));
+      LoadMetadataDetails loadMetaDetail = Arrays.stream(loadDetails)
+          .filter(loadDetail -> loadDetail.getLoadName().equals(segmentId)).findFirst().get();
+      newSegmentFileName = loadMetaDetail.getSegmentFile();
+    } else {
+      // Instead of modifying existing segment file, generate uuid to write into new segment file.
+      uuid = String.valueOf(System.currentTimeMillis());
+      newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
+          + CarbonTablePath.SEGMENT_EXT;
+    }
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
     if (!table.isHivePartitionTable()) {
-      String content = SegmentStatusManager.readFileAsString(path);
       try {
         SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
       } catch (Exception ex) {
@@ -280,18 +313,15 @@ public class CarbonIndexFileMergeWriter {
       boolean status = SegmentFileStore.updateTableStatusFile(table, segmentId, newSegmentFileName,
           table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
       if (!status) {
-        // revert to original segment file as the table status update has failed.
-        SegmentStatusManager.writeStringIntoFile(path, content);
-        // delete merge index file.
-        for (String file : mergeIndexFiles) {
-          FileFactory.getCarbonFile(file).delete();
-        }
-        // no need to delete index files, so return from here.
-        return uuid;
+        throw new IOException("Table status update with mergeIndex file has failed");
       }
     }
-    for (CarbonFile file : indexFiles) {
-      file.delete();
+    // If Alter merge index for old tables is triggered,
+    // do not delete index files immediately to avoid index file not found during concurrent queries
+    if (!isOldStoreIndexFilesPresent) {
+      for (CarbonFile file : indexFiles) {
+        file.delete();
+      }
     }
     return uuid;
   }
@@ -329,8 +359,9 @@ public class CarbonIndexFileMergeWriter {
    * @param segmentId
    */
   public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid, String tablePath,
-      String partitionPath) {
-    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid, partitionPath);
+      String partitionPath, boolean isOldStoreIndexFilesPresent) {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
+        isOldStoreIndexFilesPresent, uuid, partitionPath);
   }
 
   /**
@@ -345,15 +376,6 @@ public class CarbonIndexFileMergeWriter {
         readFileFooterFromCarbonDataFile, uuid, null);
   }
 
-  private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
-    for (CarbonFile file : indexFiles) {
-      if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
   /**
    * It writes thrift object to file
    *
diff --git a/docs/configuration-parameters.md b/docs/configuration-parameters.md
index 89cdfb3..f65d36f 100644
--- a/docs/configuration-parameters.md
+++ b/docs/configuration-parameters.md
@@ -115,7 +115,6 @@ This section provides the details of all the configurations required for the Car
 | carbon.enable.page.level.reader.in.compaction|false|Enabling page level reader for compaction reduces the memory usage while compacting more number of segments. It allows reading only page by page instead of reading whole blocklet to memory. **NOTE:** Please refer to [file-structure-of-carbondata](./file-structure-of-carbondata.md#carbondata-file-format) to understand the storage format of CarbonData and concepts of pages.|
 | carbon.concurrent.compaction | true | Compaction of different tables can be executed concurrently. This configuration determines whether to compact all qualifying tables in parallel or not. **NOTE:** Compacting concurrently is a resource demanding operation and needs more resources there by affecting the query performance also. This configuration is **deprecated** and might be removed in future releases. |
 | carbon.compaction.prefetch.enable | false | Compaction operation is similar to Query + data load where in data from qualifying segments are queried and data loading performed to generate a new single segment. This configuration determines whether to query ahead data from segments and feed it for data loading. **NOTE:** This configuration is disabled by default as it needs extra resources for querying extra data. Based on the memory availability on the cluster, user can enable it to imp [...]
-| carbon.merge.index.in.segment | true | Each CarbonData file has a companion CarbonIndex file which maintains the metadata about the data. These CarbonIndex files are read and loaded into driver and is used subsequently for pruning of data during queries. These CarbonIndex files are very small in size(few KB) and are many. Reading many small files from HDFS is not efficient and leads to slow IO performance. Hence these CarbonIndex files belonging to a segment can be combined into  a sin [...]
 | carbon.enable.range.compaction | true | To configure Ranges-based Compaction to be used or not for RANGE_COLUMN. If true after compaction also the data would be present in ranges. |
 | carbon.si.segment.merge | false | Making this true degrade the LOAD performance. When the number of small files increase for SI segments(it can happen as number of columns will be less and we store position id and reference columns), user an either set to true which will merge the data files for upcoming loads or run SI refresh command which does this job for all segments. (REFRESH INDEX <index_table>) |
 
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 98f5502..727bd91 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -156,11 +156,6 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     OperationContext operationContext = (OperationContext) getOperationContext();
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     String uuid = "";
-    SegmentFileStore.updateTableStatusFile(carbonTable, loadModel.getSegmentId(),
-        segmentFileName + CarbonTablePath.SEGMENT_EXT,
-        carbonTable.getCarbonTableIdentifier().getTableId(),
-        new SegmentFileStore(carbonTable.getTablePath(),
-            segmentFileName + CarbonTablePath.SEGMENT_EXT));
     newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
@@ -267,6 +262,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
         .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
             CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
     if (!isMergeIndex) {
+      // By default carbon.merge.index.in.segment is true and this code will be used for
+      // developer debugging purpose.
       Map<String, Set<String>> indexFileNameMap = (Map<String, Set<String>>) ObjectSerializationUtil
           .convertStringToObject(context.getConfiguration().get("carbon.index.files.name"));
       List<String> partitionList =
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
index 97d013d..cfad7cd 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergedata/CarbonDataFileMergeTestCaseOnSI.scala
@@ -110,8 +110,8 @@ class CarbonDataFileMergeTestCaseOnSI
       .addProperty(CarbonCommonConstants.CARBON_SI_SEGMENT_MERGE, "true")
     sql("REFRESH INDEX nonindexmerge_index1 ON TABLE nonindexmerge").collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
-    assert(getDataFileCount("nonindexmerge_index1", "0") < 7)
-    assert(getDataFileCount("nonindexmerge_index1", "1") < 7)
+    assert(getDataFileCount("nonindexmerge_index1", "0") > 7)
+    assert(getDataFileCount("nonindexmerge_index1", "1") > 7)
     checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
   }
 
@@ -128,12 +128,12 @@ class CarbonDataFileMergeTestCaseOnSI
     sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(0)")
       .collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
-    assert(getDataFileCount("nonindexmerge_index2", "0") < 7)
+    assert(getDataFileCount("nonindexmerge_index2", "0") > 7)
     assert(getDataFileCount("nonindexmerge_index2", "1") == 20)
     sql("REFRESH INDEX nonindexmerge_index2 ON TABLE nonindexmerge WHERE SEGMENT.ID IN(1)")
       .collect()
     checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
-    assert(getDataFileCount("nonindexmerge_index2", "1") < 7)
+    assert(getDataFileCount("nonindexmerge_index2", "1") > 7)
     checkAnswer(sql("""Select count(*) from nonindexmerge where name='n164419'"""), rows)
   }
 
@@ -227,8 +227,8 @@ class CarbonDataFileMergeTestCaseOnSI
     val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
       .queryExecution.sparkPlan
     assert(isFilterPushedDownToSI(df1))
-    assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
-    assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+    assert(getDataFileCount("nonindexmerge_index1", "0") > 15)
+    assert(getDataFileCount("nonindexmerge_index1", "1") > 15)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants
         .CARBON_SI_SEGMENT_MERGE, "true")
   }
@@ -318,8 +318,8 @@ class CarbonDataFileMergeTestCaseOnSI
     val df1 = sql("""Select * from nonindexmerge where name='n16000'""")
         .queryExecution.sparkPlan
     assert(isFilterPushedDownToSI(df1))
-    assert(getDataFileCount("nonindexmerge_index1", "0") < 15)
-    assert(getDataFileCount("nonindexmerge_index1", "1") < 15)
+    assert(getDataFileCount("nonindexmerge_index1", "0") > 15)
+    assert(getDataFileCount("nonindexmerge_index1", "1") > 15)
     checkAnswer(sql(" select count(*) from nonindexmerge_index1"), rows)
     CarbonProperties.getInstance().addProperty(CarbonCommonConstants
         .CARBON_SI_SEGMENT_MERGE, "true")
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
index 40f25d1..d2025fb 100644
--- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/mergeindex/CarbonIndexFileMergeTestCaseWithSI.scala
@@ -24,10 +24,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class CarbonIndexFileMergeTestCaseWithSI
@@ -81,8 +78,8 @@ class CarbonIndexFileMergeTestCaseWithSI
     sql("CREATE INDEX nonindexmerge_index on table nonindexmerge (name) AS 'carbondata'")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index", "0") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS indexmerge")
@@ -95,8 +92,8 @@ class CarbonIndexFileMergeTestCaseWithSI
     sql("CREATE INDEX indexmerge_index1 on table indexmerge (name) AS 'carbondata'")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
-    assert(getIndexFileCount("default_indexmerge", "0") == 0)
-    assert(getIndexFileCount("default_indexmerge_index1", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_indexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_indexmerge_index1", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
   }
@@ -117,17 +114,17 @@ class CarbonIndexFileMergeTestCaseWithSI
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     sql("CREATE INDEX nonindexmerge_index1 on table nonindexmerge (name) AS 'carbondata'")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index1", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index1", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "1") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index1", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index1", "1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index1", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -147,15 +144,15 @@ class CarbonIndexFileMergeTestCaseWithSI
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     sql("CREATE INDEX nonindexmerge_index2 on table nonindexmerge (name) AS 'carbondata'")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index2", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index2", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "1") == 20)
     sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index2", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index2", "1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index2", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -179,15 +176,15 @@ class CarbonIndexFileMergeTestCaseWithSI
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     sql("CREATE INDEX nonindexmerge_index3 on table nonindexmerge (name) AS 'carbondata'")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index3", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index3", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "1") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index3", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index3", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -215,52 +212,37 @@ class CarbonIndexFileMergeTestCaseWithSI
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     sql("CREATE INDEX nonindexmerge_index4 on table nonindexmerge (name) AS 'carbondata'")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
     sql("alter table nonindexmerge set tblproperties('global_sort_partitions'='20')")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "0.1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "2.1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge_index4", "0.2") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0.1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "2.1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge_index4", "0.2") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
         CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
   }
 
-  private def getIndexFileCount(tableName: String, segment: String): Int = {
-    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val path = CarbonTablePath
-      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonTablePath
-        .INDEX_FILE_EXT)
-    })
-    if (carbonFiles != null) {
-      carbonFiles.length
-    } else {
-      0
-    }
-  }
-
   private def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = {
     try {
       val write = new PrintWriter(fileName);
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
index 2b265d4..7543f90 100644
--- a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputCommitter.java
@@ -23,8 +23,10 @@ import java.util.UUID;
 
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.hadoop.api.CarbonOutputCommitter;
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat;
 import org.apache.carbondata.hive.util.HiveCarbonUtil;
@@ -128,11 +130,13 @@ public class MapredCarbonOutputCommitter extends OutputCommitter {
       Configuration configuration = jobContext.getConfiguration();
       CarbonLoadModel carbonLoadModel = MapredCarbonOutputFormat.getLoadModel(configuration);
       ThreadLocalSessionInfo.unsetAll();
+      CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+      new CarbonIndexFileMergeWriter(carbonTable)
+          .mergeCarbonIndexFilesOfSegment(carbonLoadModel.getSegmentId(),
+              carbonTable.getTablePath(), false,
+              String.valueOf(carbonLoadModel.getFactTimeStamp()));
       SegmentFileStore.writeSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
           carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
-      SegmentFileStore
-          .mergeIndexAndWriteSegmentFile(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
-              carbonLoadModel.getSegmentId(), String.valueOf(carbonLoadModel.getFactTimeStamp()));
       CarbonTableOutputFormat.setLoadModel(configuration, carbonLoadModel);
       carbonOutputCommitter.commitJob(jobContext);
     } catch (Exception e) {
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
index 5c5a1b7..03c6e1c 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/integrationtest/PrestoInsertIntoTableTestCase.scala
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTable
 import org.apache.carbondata.core.metadata.schema.SchemaReader
 import org.apache.carbondata.core.metadata.schema.table.TableSchema
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonTestUtil, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.presto.server.PrestoServer
 import org.apache.carbondata.presto.util.CarbonDataStoreCreator
@@ -161,9 +161,7 @@ class PrestoInsertIntoTableTestCase
         file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
       }
     }).length == 2)
-    val segmentsPath = CarbonTablePath.getSegmentFilesLocation(tablePath)
-    assert(FileFactory.getCarbonFile(segmentsPath).isFileExist &&
-           FileFactory.getCarbonFile(segmentsPath).listFiles(true).size() == 2)
+    assert(CarbonTestUtil.getSegmentFileCount("testdb_testtable") == 2)
     val metadataFolderPath = CarbonTablePath.getMetadataPath(tablePath)
     FileFactory.getCarbonFile(metadataFolderPath).listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = {
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 754fc63..9405f3a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -491,19 +491,8 @@ object CarbonDataRDDFactory {
         val segmentMetaDataInfo = CommonLoadUtils.getSegmentMetaDataInfoFromAccumulator(
           carbonLoadModel.getSegmentId,
           segmentMetaDataAccumulator)
-        val segmentFileName =
-          SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
-            String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
-        // clear segmentMetaDataAccumulator
         segmentMetaDataAccumulator.reset()
 
-        SegmentFileStore.updateTableStatusFile(
-          carbonTable,
-          carbonLoadModel.getSegmentId,
-          segmentFileName,
-          carbonTable.getCarbonTableIdentifier.getTableId,
-          new SegmentFileStore(carbonTable.getTablePath, segmentFileName))
-
         operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
           carbonLoadModel.getSegmentId)
         val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -512,6 +501,9 @@ object CarbonDataRDDFactory {
             carbonLoadModel)
         OperationListenerBus.getInstance()
           .fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+        val segmentFileName =
+          SegmentFileStore.writeSegmentFile(carbonTable, carbonLoadModel.getSegmentId,
+            String.valueOf(carbonLoadModel.getFactTimeStamp), segmentMetaDataInfo)
         val (done, writtenSegment) =
           updateTableStatus(
             sqlContext.sparkSession,
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index d8072cf..642387c 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -352,6 +352,8 @@ class CarbonTableCompactor(
               s"to ${segmentFileName} failed.")
           }
         } else {
+          // By default carbon.merge.index.in.segment is true and this code will be used for
+          // developer debugging purpose.
           val readPath =
             CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
               CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
@@ -408,7 +410,6 @@ class CarbonTableCompactor(
           MVManagerInSpark.get(sc.sparkSession))
 
       if (!statusFileUpdate) {
-        // no need to call merge index if table status update has failed
         LOGGER.error(s"Compaction request failed for table ${ carbonLoadModel.getDatabaseName }." +
                      s"${ carbonLoadModel.getTableName }")
         throw new Exception(s"Compaction failed to update metadata for table" +
diff --git a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index 70a9dd8..974035c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.rdd
 
-import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
+import java.io.IOException
+import java.util.concurrent.{Executors, ExecutorService}
 
 import scala.collection.JavaConverters._
 
@@ -30,7 +31,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
@@ -87,7 +88,8 @@ object CarbonMergeFilesRDD {
         carbonTable.isHivePartitionTable,
         readFileFooterFromCarbonDataFile,
         partitionInfo,
-        tempFolderPath).collect()
+        tempFolderPath,
+        currPartitionSpec).collect()
     } else {
       try {
         if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
@@ -148,10 +150,7 @@ object CarbonMergeFilesRDD {
           val message = "Merge Index files request is failed " +
                         s"for table ${ carbonTable.getTableUniqueName }. " + ex.getMessage
           LOGGER.error(message)
-          if (isPropertySet(CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION,
-            CarbonCommonConstants.CARBON_MERGE_INDEX_FAILURE_THROW_EXCEPTION_DEFAULT)) {
-            throw new RuntimeException(message, ex)
-          }
+          throw new RuntimeException(message, ex)
       }
     }
     if (carbonTable.isHivePartitionTable && !StringUtils.isEmpty(tempFolderPath)) {
@@ -185,13 +184,25 @@ object CarbonMergeFilesRDD {
         val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
                                CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
                                segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+        // Generate new timestamp for segment file writing instead of overwriting existing one.
+        val uuid = String.valueOf(System.currentTimeMillis)
+        val newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
         // Merge all partition files into a single file.
-        val segmentFileName: String = SegmentFileStore
-          .genSegmentFileName(segmentId, segmentFileNameToSegmentIdMap.get(segmentId))
-        SegmentFileStore
+        val segmentFile = SegmentFileStore
           .mergeSegmentFiles(readPath,
-            segmentFileName,
+            newSegmentFileName,
             CarbonTablePath.getSegmentFilesLocation(tablePath))
+        if (segmentFile != null) {
+          val sfs = new SegmentFileStore(tablePath, newSegmentFileName +
+            CarbonTablePath.SEGMENT_EXT)
+          // when compact segment_index, update table status with new segment file name
+          val status = SegmentFileStore.updateTableStatusFile(carbonTable, segmentId,
+            newSegmentFileName + CarbonTablePath.SEGMENT_EXT,
+            carbonTable.getCarbonTableIdentifier.getTableId, sfs)
+          if (!status) {
+            throw new IOException("Table status update with mergeIndex file has failed")
+          }
+        }
       })
     }
     mergeIndexSize
@@ -280,12 +291,13 @@ class CarbonMergeFilesRDD(
 
       var segmentFile: SegmentFileStore.SegmentFile = null
       var indexSize: String = ""
-      if (isHivePartitionedTable && partitionInfo.isEmpty) {
+      if (isHivePartitionedTable && readFileFooterFromCarbonDataFile) {
         CarbonLoaderUtil.mergeIndexFilesInPartitionedSegment(
           carbonTable,
           split.segmentId,
           segmentFileNameToSegmentIdMap.get(split.segmentId),
-          split.partitionPath)
+          split.partitionPath,
+          readFileFooterFromCarbonDataFile)
       } else if (isHivePartitionedTable && !partitionInfo.isEmpty) {
         val folderDetails = CarbonLoaderUtil
           .mergeIndexFilesInPartitionedTempSegment(carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
index 3872db5..c92bce8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/events/MergeIndexEventListener.scala
@@ -29,10 +29,11 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.spark.util.MergeIndexUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatusManager}
 import org.apache.carbondata.core.util.{DataLoadMetrics, ObjectSerializationUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
 import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
@@ -122,10 +123,10 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                 .put(loadMetadataDetails.getLoadName,
                   String.valueOf(loadMetadataDetails.getLoadStartTime))
             })
-            val segmentsToMerge =
+            val validSegments =
+              CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
+            var segmentsToMerge =
               if (alterTableMergeIndexEvent.alterTableModel.customSegmentIds.isEmpty) {
-                val validSegments =
-                  CarbonDataMergerUtil.getValidSegmentList(carbonMainTable).asScala
                 val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
                 validSegments.foreach { segment =>
                   // do not add ROW_V1 format
@@ -140,11 +141,35 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
                   .get
                   .filterNot(streamingSegment.contains(_))
               }
+            validSegments.filter(x => segmentsToMerge.contains(x.getSegmentNo)).foreach { segment =>
+              val segmentFile = segment.getSegmentFileName
+              val sfs = new SegmentFileStore(carbonMainTable.getTablePath, segmentFile)
+              if (sfs.getSegmentFile != null) {
+                val indexFiles = sfs.getIndexCarbonFiles
+                val segmentPath = CarbonTablePath
+                  .getSegmentPath(carbonMainTable.getTablePath, segment.getSegmentNo)
+                if (indexFiles.size() == 0) {
+                  LOGGER.warn(s"No index files present in path: $segmentPath to merge")
+                  // call merge if segments have index files
+                  segmentsToMerge = segmentsToMerge.toStream
+                    .filterNot(s => s.equals(segment.getSegmentNo)).toList
+                }
+              }
+            }
             // in case of merge index file creation using Alter DDL command
             // readFileFooterFromCarbonDataFile flag should be true. This flag is check for legacy
             // store (store <= 1.1 version) and create merge Index file as per new store so that
             // old store is also upgraded to new store
             val startTime = System.currentTimeMillis()
+            val partitionInfo: util.List[String] = operationContext
+              .getProperty("partitionPath")
+              .asInstanceOf[util.List[String]]
+            val currPartitionSpec = operationContext.getProperty("carbon.currentpartition")
+            val currPartitionSpecOption: Option[String] = if (currPartitionSpec == null) {
+              None
+            } else {
+              Option(currPartitionSpec.asInstanceOf[String])
+            }
             CarbonMergeFilesRDD.mergeIndexFiles(
               sparkSession = sparkSession,
               segmentIds = segmentsToMerge,
@@ -152,7 +177,9 @@ class MergeIndexEventListener extends OperationEventListener with Logging {
               tablePath = carbonMainTable.getTablePath,
               carbonTable = carbonMainTable,
               mergeIndexProperty = true,
-              readFileFooterFromCarbonDataFile = true)
+              readFileFooterFromCarbonDataFile = true,
+              partitionInfo = partitionInfo,
+              currPartitionSpec = currPartitionSpecOption)
             LOGGER.info("Total time taken for merge index "
                         + (System.currentTimeMillis() - startTime) + "ms")
             // clear Block index Cache
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index b01c125..e319185 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -327,6 +327,15 @@ case class CarbonAddLoadCommand(
         System.nanoTime().toString) + CarbonTablePath.SEGMENT_EXT,
       segmentPath,
       new util.HashMap[String, String](options.asJava))
+    // This event will trigger merge index job, only trigger it if it is carbon file
+    if (isCarbonFormat) {
+      CarbonLoaderUtil.mergeIndexFilesInAddLoadSegment(carbonTable,
+        model.getSegmentId,
+        segmentPath,
+        model.getFactTimeStamp.toString)
+      // clear Block index Cache
+      SegmentFileStore.clearBlockIndexCache(carbonTable, model.getSegmentId)
+    }
     val writeSegment =
       if (isCarbonFormat) {
         SegmentFileStore.writeSegmentFile(carbonTable, segment)
@@ -335,22 +344,6 @@ case class CarbonAddLoadCommand(
           carbonTable, segment, partitionSpecOp.orNull, partitionDataFiles.asJava)
       }
 
-    // This event will trigger merge index job, only trigger it if it is carbon file
-    if (isCarbonFormat) {
-      operationContext.setProperty(
-        carbonTable.getTableUniqueName + "_Segment",
-        model.getSegmentId)
-      // when this event is triggered, SI load listener will be called for all the SI tables under
-      // this main table, no need to load the SI tables for add load command, so add this property
-      // to check in SI load event listener to avoid loading to SI.
-      operationContext.setProperty("isAddLoad", "true")
-      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
-        new LoadTablePreStatusUpdateEvent(
-          carbonTable.getCarbonTableIdentifier,
-          model)
-      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
-    }
-
     val success = if (writeSegment) {
       SegmentFileStore.updateTableStatusFile(
         carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 7933a25..eaa62d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution.command.partition
 import java.util
 
 import scala.collection.JavaConverters._
+import scala.util.control.Breaks.{break, breakable}
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -31,11 +32,13 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
-import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.{CarbonUtil, ObjectSerializationUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{withEvents, AlterTableMergeIndexEvent, OperationContext, OperationListenerBus, PostAlterTableHivePartitionCommandEvent, PreAlterTableHivePartitionCommandEvent}
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -105,20 +108,6 @@ case class CarbonAlterTableAddHivePartitionCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // Partitions with physical data should be registered to as a new segment.
     if (partitionSpecsAndLocsTobeAdded != null && partitionSpecsAndLocsTobeAdded.size() > 0) {
-      val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
-        partitionSpecsAndLocsTobeAdded)
-      if (segmentFile != null) {
-        val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
-        val tableColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
-        val isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
-          columnSchemas.asScala.exists { col =>
-            tableColumns.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
-          } && columnSchemas.size() == tableColumns.length
-        }
-        if (!isSameSchema) {
-          throw new UnsupportedOperationException(
-            "Schema of index files located in location is not matching with current table schema")
-        }
         val loadModel = new CarbonLoadModel
         val columnCompressor = table.getTableInfo.getFactTable.getTableProperties.asScala
           .getOrElse(CarbonCommonConstants.COMPRESSOR,
@@ -128,42 +117,28 @@ case class CarbonAlterTableAddHivePartitionCommand(
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // create operationContext to fire load events
         val operationContext: OperationContext = new OperationContext
-        val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
-          sparkSession = sparkSession,
-          carbonLoadModel = loadModel,
-          uuid = "",
-          factPath = "",
-          null,
-          null,
-          isOverwriteTable = false,
-          isDataFrame = false,
-          updateModel = None,
-          operationContext = operationContext)
+        var hasIndexFiles = false
+        breakable {
+          partitionSpecsAndLocsTobeAdded.asScala.foreach(partitionSpecAndLocs => {
+            val location = partitionSpecAndLocs.getLocation.toString
+            val carbonFile = FileFactory.getCarbonFile(location)
+            val listFiles: Array[CarbonFile] = carbonFile.listFiles(new CarbonFileFilter() {
+              override def accept(file: CarbonFile): Boolean = {
+                file.getName.endsWith(CarbonTablePath.INDEX_FILE_EXT) ||
+                file.getName.endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)
+              }
+            })
+            if (listFiles != null && listFiles.length > 0) {
+              hasIndexFiles = true
+              break()
+            }
+          })
+       }
+      // only if the partition path has some index files,
+      // make entry in table status and trigger merge index event.
+      if (hasIndexFiles) {
         // Create new entry in tablestatus file
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
-        val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
-        val segmentFileName =
-          SegmentFileStore.genSegmentFileName(
-            loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
-          CarbonTablePath.SEGMENT_EXT
-        newMetaEntry.setSegmentFile(segmentFileName)
-        // set path to identify it as external added partition
-        newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
-          .map(_.getLocation.toString).mkString(","))
-        val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
-        CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
-        val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
-        SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
-        CarbonLoaderUtil.populateNewLoadMetaEntry(
-          newMetaEntry,
-          SegmentStatus.SUCCESS,
-          loadModel.getFactTimeStamp,
-          true)
-        // Add size to the entry
-        CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
-        // Make the load as success in table status
-        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
-
         // Normally, application will use Carbon SDK to write files into a partition folder, then
         // add the folder to partitioned carbon table.
         // If there are many threads writes to the same partition folder, there will be many
@@ -177,6 +152,10 @@ case class CarbonAlterTableAddHivePartitionCommand(
         } else {
           Some(Seq(loadModel.getSegmentId).toList)
         }
+        operationContext.setProperty("partitionPath",
+          partitionSpecsAndLocsTobeAdded.asScala.map(_.getLocation.toString).asJava)
+        operationContext.setProperty("carbon.currentpartition",
+          ObjectSerializationUtil.convertObjectToString(partitionSpecsAndLocsTobeAdded))
         val alterTableModel = AlterTableModel(
           dbName = Some(table.getDatabaseName),
           tableName = table.getTableName,
@@ -185,14 +164,64 @@ case class CarbonAlterTableAddHivePartitionCommand(
           factTimeStamp = Some(System.currentTimeMillis()),
           customSegmentIds = customSegmentIds)
         val mergeIndexEvent = AlterTableMergeIndexEvent(sparkSession, table, alterTableModel)
-        OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, new OperationContext)
-        // fire event to load data to materialized views
-        CommonLoadUtils.firePostLoadEvents(sparkSession,
-          loadModel,
-          tableIndexes,
-          indexOperationContext,
-          table,
-          operationContext)
+        OperationListenerBus.getInstance.fireEvent(mergeIndexEvent, operationContext)
+        val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
+        val segmentFileName =
+          SegmentFileStore.genSegmentFileName(
+            loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
+          CarbonTablePath.SEGMENT_EXT
+        newMetaEntry.setSegmentFile(segmentFileName)
+        // set path to identify it as external added partition
+        newMetaEntry.setPath(partitionSpecsAndLocsTobeAdded.asScala
+          .map(_.getLocation.toString).mkString(","))
+        val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+        CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)
+        val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+        val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table
+          .getTablePath,
+          partitionSpecsAndLocsTobeAdded)
+        if (segmentFile != null) {
+          val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
+          val tableColumns = table.getTableInfo.getFactTable.getListOfColumns.asScala
+          val isSameSchema = indexToSchemas.asScala.exists { case (key, columnSchemas) =>
+            columnSchemas.asScala.exists { col =>
+              tableColumns.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
+            } && columnSchemas.size() == tableColumns.length
+          }
+          if (!isSameSchema) {
+            throw new UnsupportedOperationException(
+              "Schema of index files located in location is not matching with current table schema")
+          }
+          val (tableIndexes, indexOperationContext) = CommonLoadUtils.firePreLoadEvents(
+            sparkSession = sparkSession,
+            carbonLoadModel = loadModel,
+            uuid = "",
+            factPath = "",
+            new util.HashMap[String, String](),
+            new util.HashMap[String, String](),
+            isOverwriteTable = false,
+            isDataFrame = false,
+            updateModel = None,
+            operationContext = operationContext)
+          SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
+          CarbonLoaderUtil.populateNewLoadMetaEntry(
+            newMetaEntry,
+            SegmentStatus.SUCCESS,
+            loadModel.getFactTimeStamp,
+            true)
+          // Add size to the entry
+          CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry,
+            loadModel.getSegmentId, table)
+          // Make the load as success in table status
+          CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+          // fire event to load data to materialized views
+          CommonLoadUtils.firePostLoadEvents(sparkSession,
+            loadModel,
+            tableIndexes,
+            indexOperationContext,
+            table,
+            operationContext)
+        }
       }
     }
     Seq.empty[Row]
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
index 6bb14cf..8cda55f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/AlterTableMergeIndexSIEventListener.scala
@@ -33,8 +33,10 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.index.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 
@@ -85,7 +87,21 @@ class AlterTableMergeIndexSIEventListener
                   .asScala
                 val validSegmentIds: mutable.Buffer[String] = mutable.Buffer[String]()
                 validSegments.foreach { segment =>
-                  validSegmentIds += segment.getSegmentNo
+                  val segmentFile = segment.getSegmentFileName
+                  val sfs = new SegmentFileStore(indexCarbonTable.getTablePath, segmentFile)
+                  if (sfs.getSegmentFile != null) {
+                    val indexFiles = sfs.getIndexCarbonFiles
+                    val segmentPath = CarbonTablePath
+                      .getSegmentPath(indexCarbonTable.getTablePath, segment.getSegmentNo)
+                    if (indexFiles.size() == 0) {
+                      LOGGER.warn(s"No index files present in path: $segmentPath to merge")
+                    } else {
+                      // call merge only if segments having index files
+                      validSegmentIds += segment.getSegmentNo
+                    }
+                  } else {
+                    validSegmentIds += segment.getSegmentNo
+                  }
                 }
                 // Just launch job to merge index for all index tables
                 CarbonMergeFilesRDD.mergeIndexFiles(
@@ -94,7 +110,8 @@ class AlterTableMergeIndexSIEventListener
                   SegmentStatusManager.mapSegmentToStartTime(carbonMainTable),
                   indexCarbonTable.getTablePath,
                   indexCarbonTable,
-                  mergeIndexProperty = true)
+                  mergeIndexProperty = true,
+                  readFileFooterFromCarbonDataFile = true)
               }
             }
           }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
index 0b40ed8..15f9048 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
@@ -43,10 +43,6 @@ class SILoadEventListener extends OperationEventListener with Logging {
   override def onEvent(event: Event, operationContext: OperationContext): Unit = {
     event match {
       case _: LoadTablePreStatusUpdateEvent =>
-        if (operationContext.getProperty("isAddLoad") != null &&
-            operationContext.getProperty("isAddLoad").toString.toBoolean) {
-          return
-        }
         LOGGER.info("Load pre status update event-listener called")
         val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent]
         val carbonLoadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index b371947..a7677ea 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -48,11 +48,6 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
           val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
           val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
           val sparkSession = SparkSession.getActiveSession.get
-          // Avoid loading segment to SI for add load command
-          if (operationContext.getProperty("isAddLoad") != null &&
-            operationContext.getProperty("isAddLoad").toString.toBoolean) {
-            return
-          }
           if (CarbonProperties.getInstance().isSIRepairEnabled(carbonLoadModel.getDatabaseName,
             carbonLoadModel.getTableName)) {
           // when Si creation and load to main table are parallel, get the carbonTable from the
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index cfb79b2..7fc29bd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.status.IndexStatus
 import org.apache.carbondata.core.locks.ICarbonLock
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.index.IndexType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
@@ -100,13 +101,6 @@ object Compactor {
           return
         }
         allSegmentsLock ++= segmentLocks
-        CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
-          indexCarbonTable,
-          loadsToMerge,
-          validSegments.head,
-          segmentToSegmentTimestampMap,
-          segmentIdToLoadStartTimeMapping(validSegments.head),
-          SegmentStatus.INSERT_IN_PROGRESS, 0L, List.empty.asJava)
 
         // merge index files
         CarbonMergeFilesRDD.mergeIndexFiles(sqlContext.sparkSession,
@@ -132,7 +126,14 @@ object Compactor {
           secondaryIndexModel.segmentIdToLoadStartTimeMapping,
           indexCarbonTable,
           loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sqlContext)
-
+        if (rebuiltSegments.isEmpty) {
+          for (eachSegment <- secondaryIndexModel.validSegments) {
+            SegmentFileStore
+              .writeSegmentFile(indexCarbonTable,
+                eachSegment,
+                String.valueOf(carbonLoadModel.getFactTimeStamp))
+          }
+        }
         CarbonInternalLoaderUtil.updateLoadMetadataWithMergeStatus(
           indexCarbonTable,
           loadsToMerge,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index e493d47..96cf8c7 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -168,6 +168,7 @@ object SecondaryIndexCreator {
       }
       var successSISegments: List[String] = List()
       var failedSISegments: List[String] = List()
+      val carbonLoadModel: CarbonLoadModel = getCopyObject(secondaryIndexModel)
       val sort_scope = indexCarbonTable.getTableInfo.getFactTable.getTableProperties
         .get("sort_scope")
       if (sort_scope != null && sort_scope.equalsIgnoreCase("global_sort")) {
@@ -179,7 +180,6 @@ object SecondaryIndexCreator {
             .submit(new Callable[Array[(String, (LoadMetadataDetails, ExecutionErrors))]] {
               @throws(classOf[Exception])
               override def call(): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
-                val carbonLoadModel = getCopyObject(secondaryIndexModel)
                 // to load as a global sort SI segment,
                 // we need to query main table along with position reference projection
                 val projections = indexCarbonTable.getCreateOrderColumn
@@ -237,10 +237,6 @@ object SecondaryIndexCreator {
                     carbonLoadModel,
                     hadoopConf = configuration, segmentMetaDataAccumulator)
                 }
-                SegmentFileStore
-                  .writeSegmentFile(indexCarbonTable,
-                    eachSegment,
-                    String.valueOf(carbonLoadModel.getFactTimeStamp))
                 segmentToLoadStartTimeMap
                   .put(eachSegment, String.valueOf(carbonLoadModel.getFactTimeStamp))
                 result
@@ -292,7 +288,6 @@ object SecondaryIndexCreator {
                 .put("carbonConf", SparkSQLUtil.sessionState(sc.sparkSession).newHadoopConf())
               var eachSegmentSecondaryIndexCreationStatus: Array[(String, Boolean)] = Array.empty
               CarbonLoaderUtil.checkAndCreateCarbonDataLocation(segId, indexCarbonTable)
-              val carbonLoadModel = getCopyObject(secondaryIndexModel)
               carbonLoadModel
                 .setFactTimeStamp(secondaryIndexModel.segmentIdToLoadStartTimeMapping(eachSegment))
               carbonLoadModel.setTablePath(secondaryIndexModel.carbonTable.getTablePath)
@@ -301,10 +296,6 @@ object SecondaryIndexCreator {
                 carbonLoadModel,
                 secondaryIndexModel.secondaryIndex,
                 segId, execInstance, indexCarbonTable, forceAccessSegment).collect()
-              SegmentFileStore
-                .writeSegmentFile(indexCarbonTable,
-                  segId,
-                  String.valueOf(carbonLoadModel.getFactTimeStamp))
               segmentToLoadStartTimeMap.put(segId, carbonLoadModel.getFactTimeStamp.toString)
               if (secondaryIndexCreationStatus.length > 0) {
                 eachSegmentSecondaryIndexCreationStatus = secondaryIndexCreationStatus
@@ -349,16 +340,6 @@ object SecondaryIndexCreator {
       var tableStatusUpdateForFailure = false
 
       if (successSISegments.nonEmpty && !isCompactionCall) {
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          segmentStatus,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession)
-
         // merge index files for success segments in case of only load
         CarbonMergeFilesRDD.mergeIndexFiles(secondaryIndexModel.sqlContext.sparkSession,
           successSISegments,
@@ -384,18 +365,6 @@ object SecondaryIndexCreator {
             indexCarbonTable,
             loadMetadataDetails.toList.asJava, carbonLoadModelForMergeDataFiles)(sc)
 
-        tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
-          successSISegments,
-          secondaryIndexModel.carbonLoadModel.getDatabaseName,
-          secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.SUCCESS,
-          secondaryIndexModel.segmentIdToLoadStartTimeMapping,
-          segmentToLoadStartTimeMap,
-          indexCarbonTable,
-          secondaryIndexModel.sqlContext.sparkSession,
-          carbonLoadModelForMergeDataFiles.getFactTimeStamp,
-          rebuiltSegments)
-
         if (isInsertOverwrite) {
           val overriddenSegments = SegmentStatusManager
           .readLoadMetadata(indexCarbonTable.getMetadataPath)
@@ -412,6 +381,24 @@ object SecondaryIndexCreator {
               indexTable,
               secondaryIndexModel.sqlContext.sparkSession)
         }
+        if (rebuiltSegments.isEmpty) {
+          for (loadMetadata <- loadMetadataDetails) {
+            SegmentFileStore
+              .writeSegmentFile(indexCarbonTable, loadMetadata.getLoadName,
+                String.valueOf(loadMetadata.getLoadStartTime))
+          }
+          tableStatusUpdateForSuccess = FileInternalUtil.updateTableStatus(
+            successSISegments,
+            secondaryIndexModel.carbonLoadModel.getDatabaseName,
+            secondaryIndexModel.secondaryIndex.indexName,
+            SegmentStatus.SUCCESS,
+            secondaryIndexModel.segmentIdToLoadStartTimeMapping,
+            segmentToLoadStartTimeMap,
+            indexCarbonTable,
+            secondaryIndexModel.sqlContext.sparkSession,
+            carbonLoadModelForMergeDataFiles.getFactTimeStamp,
+            rebuiltSegments)
+        }
       }
 
       if (!isCompactionCall) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
index 0977b77..63bd726 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/util/SecondaryIndexUtil.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.log4j.Logger
 import org.apache.spark.deploy.SparkHadoopUtil
-import org.apache.spark.rdd.CarbonMergeFilesRDD
+import org.apache.spark.rdd.CarbonMergeFilesRDD.isPropertySet
 import org.apache.spark.sql.{CarbonEnv, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
@@ -45,17 +45,18 @@ import org.apache.carbondata.core.datastore.block.{TableBlockInfo, TaskBlockInfo
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
-import org.apache.carbondata.core.locks.CarbonLockUtil
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.datatype.{DataType, StructField, StructType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.segmentmeta.SegmentMetaDataInfo
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.hadoop.CarbonInputSplit
 import org.apache.carbondata.hadoop.api.{CarbonInputFormat, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
@@ -63,7 +64,6 @@ import org.apache.carbondata.indexserver.IndexServer
 import org.apache.carbondata.processing.loading.FailureCauses
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
-import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.MergeResultImpl
 import org.apache.carbondata.spark.load.DataLoadProcessBuilderOnSpark
 import org.apache.carbondata.spark.rdd.CarbonSparkPartition
@@ -217,8 +217,17 @@ object SecondaryIndexUtil {
       }
       if (finalMergeStatus) {
         if (null != mergeStatus && mergeStatus.length != 0) {
+          // Segment file is not yet written during SI load, in such case we can delete old index
+          // files immediately. If segment file is already present and SI refresh is triggered, then
+          // do not delete immediately to avoid failures during parallel queries.
           val validSegmentsToUse = validSegments.asScala
-            .filter(segment => mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo))
+            .filter(segment => {
+              val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(tablePath) +
+                                    CarbonCommonConstants.FILE_SEPARATOR +
+                                    segment.getSegmentFileName
+              mergeStatus.map(_._2).toSet.contains(segment.getSegmentNo) &&
+              !FileFactory.isFileExist(segmentFilePath)
+            })
           deleteOldIndexOrMergeIndexFiles(
             carbonLoadModel.getFactTimeStamp,
             validSegmentsToUse.toList.asJava,
@@ -231,73 +240,36 @@ object SecondaryIndexUtil {
           } else {
             siRebuildRDD.partitions.foreach { partition =>
               val carbonSparkPartition = partition.asInstanceOf[CarbonSparkPartition]
-              deleteOldCarbonDataFiles(carbonSparkPartition)
+              deleteOldCarbonDataFiles(carbonSparkPartition, validSegmentsToUse.toList)
             }
           }
+          val segmentToLoadStartTimeMap: scala.collection.mutable.Map[String, java.lang.Long] =
+            scala.collection.mutable.Map()
+          // merge index files and write segment file for merged segments
           mergedSegments.asScala.map { seg =>
+            val segmentPath = CarbonTablePath.getSegmentPath(tablePath, seg.getLoadName)
+            try {
+              new CarbonIndexFileMergeWriter(indexCarbonTable)
+                .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath,
+                  seg.getLoadName, carbonLoadModel.getFactTimeStamp.toString,
+                  true)
+            } catch {
+              case e: IOException =>
+                val message =
+                  s"Failed to merge index files in path: $segmentPath. ${ e.getMessage() } "
+                LOGGER.error(message)
+                throw new RuntimeException(message, e)
+            }
             val file = SegmentFileStore.writeSegmentFile(
               indexCarbonTable,
               seg.getLoadName,
               carbonLoadModel.getFactTimeStamp.toString,
               null,
               null)
-            val segment = new Segment(seg.getLoadName, file)
-            SegmentFileStore.updateTableStatusFile(indexCarbonTable,
-              seg.getLoadName,
-              file,
-              indexCarbonTable.getCarbonTableIdentifier.getTableId,
-              new SegmentFileStore(tablePath, segment.getSegmentFileName))
-            segment
-          }
-
-          val statusLock =
-            new SegmentStatusManager(indexCarbonTable.getAbsoluteTableIdentifier).getTableStatusLock
-          try {
-            val retryCount = CarbonLockUtil.getLockProperty(CarbonCommonConstants
-              .NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-              CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT)
-            val maxTimeout = CarbonLockUtil.getLockProperty(CarbonCommonConstants
-              .MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-              CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT)
-            if (statusLock.lockWithRetries(retryCount, maxTimeout)) {
-              val endTime = System.currentTimeMillis()
-              val loadMetadataDetails = SegmentStatusManager
-                .readLoadMetadata(indexCarbonTable.getMetadataPath)
-              loadMetadataDetails.foreach(loadMetadataDetail => {
-                if (rebuiltSegments.contains(loadMetadataDetail.getLoadName)) {
-                  loadMetadataDetail.setLoadStartTime(carbonLoadModel.getFactTimeStamp)
-                  loadMetadataDetail.setLoadEndTime(endTime)
-                  CarbonLoaderUtil
-                    .addDataIndexSizeIntoMetaEntry(loadMetadataDetail,
-                      loadMetadataDetail.getLoadName,
-                      indexCarbonTable)
-                }
-              })
-              SegmentStatusManager
-                .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath(tablePath),
-                  loadMetadataDetails)
-            } else {
-              throw new RuntimeException(
-                "Not able to acquire the lock for table status updation for table " + databaseName +
-                "." + indexCarbonTable.getTableName)
-            }
-          } finally {
-            if (statusLock != null) {
-              statusLock.unlock()
-            }
-          }
-          // clear the indexSchema cache for the merged segments, as the index files and
-          // data files are rewritten after compaction
-          if (mergedSegments.size > 0) {
-
-            // merge index files for merged segments
-            CarbonMergeFilesRDD.mergeIndexFiles(sc.sparkSession,
-              rebuiltSegments.toSeq,
-              segmentIdToLoadStartTimeMap,
-              indexCarbonTable.getTablePath,
-              indexCarbonTable, mergeIndexProperty = false
-            )
-
+            segmentToLoadStartTimeMap.put(seg.getLoadName,
+              carbonLoadModel.getFactTimeStamp)
+            // clear the indexSchema cache for the merged segments, as the index files and
+            // data files are rewritten after compaction
             if (CarbonProperties.getInstance()
               .isDistributedPruningEnabled(indexCarbonTable.getDatabaseName,
                 indexCarbonTable.getTableName)) {
@@ -310,7 +282,24 @@ object SecondaryIndexUtil {
                 case _: Exception =>
               }
             }
-
+            val segment = new Segment(seg.getLoadName, file)
+            segment
+          }
+          // Here can exclude updating table status for compaction type.
+          // For compaction call, we can directly update table status with Compacted/Success
+          // with logic present in CarbonTableCompactor.
+          if (compactionType == null) {
+            FileInternalUtil.updateTableStatus(
+              rebuiltSegments.toList,
+              carbonLoadModel.getDatabaseName,
+              indexCarbonTable.getTableName,
+              SegmentStatus.SUCCESS,
+              segmentToLoadStartTimeMap,
+              new java.util.HashMap[String, String],
+              indexCarbonTable,
+              sc.sparkSession,
+              carbonLoadModel.getFactTimeStamp,
+              rebuiltSegments)
             IndexStoreManager.getInstance
               .clearInvalidSegments(indexCarbonTable, rebuiltSegments.toList.asJava)
           }
@@ -343,7 +332,7 @@ object SecondaryIndexUtil {
       indexCarbonTable: CarbonTable): Unit = {
     // delete the index/merge index carbonFile of old data files
     validSegments.asScala.foreach { segment =>
-      SegmentFileStore.getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
+      getIndexFilesListForSegment(segment, indexCarbonTable.getTablePath)
         .asScala
         .foreach { indexFile =>
           if (DataFileUtil.getTimeStampFromFileName(indexFile).toLong < factTimeStamp) {
@@ -354,15 +343,36 @@ object SecondaryIndexUtil {
   }
 
   /**
+   * This method returns the list of index/merge index files for a segment in carbonTable.
+   */
+  @throws[IOException]
+  private def getIndexFilesListForSegment(segment: Segment, tablePath: String): util.Set[String] = {
+    var indexFiles : util.Set[String] = new util.HashSet[String]
+    val segmentFileStore = new SegmentFileStore(tablePath,
+      segment.getSegmentFileName)
+    val segmentPath = CarbonTablePath.getSegmentPath(tablePath, segment.getSegmentNo)
+    if (segmentFileStore.getSegmentFile == null)  {
+      indexFiles = new SegmentIndexFileStore()
+        .getMergeOrIndexFilesFromSegment(segmentPath).keySet
+    } else {
+      indexFiles = segmentFileStore.getIndexAndMergeFiles.keySet
+    }
+    indexFiles
+  }
+
+  /**
    * This method delete the carbondata files present in pertition of during small
    * datafile merge after loading a segment to SI table. It should be deleted after
    * data file merge operation, else, concurrency can cause file not found issues.
    */
-  private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition): Unit = {
+  private def deleteOldCarbonDataFiles(partition: CarbonSparkPartition,
+      validSegmentsToUse: List[Segment]): Unit = {
     val splitList = partition.split.value.getAllSplits
     splitList.asScala.foreach { split =>
-      val carbonFile = FileFactory.getCarbonFile(split.getFilePath)
-      carbonFile.delete()
+      if (validSegmentsToUse.contains(split.getSegment)) {
+        val mergedCarbonDataFile = FileFactory.getCarbonFile(split.getFilePath)
+        mergedCarbonDataFile.delete()
+      }
     }
   }
   /**
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index f909537..e4be592 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -17,10 +17,12 @@
 
 package org.apache.carbondata.spark.testsuite.datacompaction
 
+import java.io.IOException
 import java.util
 
 import scala.collection.JavaConverters._
 
+import mockit.{Mock, MockUp}
 import org.apache.spark.sql.{CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
@@ -32,11 +34,13 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletIndexFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
 
 class CarbonIndexFileMergeTestCase
   extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
@@ -69,7 +73,7 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("DROP TABLE IF EXISTS indexmerge")
@@ -81,7 +85,7 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
-    assert(getIndexFileCount("default_indexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
   }
@@ -101,13 +105,16 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getSegmentFileCount("default_nonindexmerge") == 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
+    // creates new segment file instead of updating
+    assert(CarbonTestUtil.getSegmentFileCount("default_nonindexmerge") == 4)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -126,11 +133,11 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
     sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -150,12 +157,12 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -178,13 +185,13 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
@@ -223,19 +230,19 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -291,20 +298,20 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
@@ -329,24 +336,24 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-      CarbonProperties.getInstance()
-        .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')"
     )
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(300000)))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
@@ -373,24 +380,24 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')")
     val rows = sql("""Select count(*) from nonindexmerge""").collect()
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
       .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "true")
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
     s"'GLOBAL_SORT_PARTITIONS'='20')"
     )
-    assert(getIndexFileCount("default_nonindexmerge", "0") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "1") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "2") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "3") == 20)
-    assert(getIndexFileCount("default_nonindexmerge", "4") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "2.1") == 0)
-    assert(getIndexFileCount("default_nonindexmerge", "0.2") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "1") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "3") == 20)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "4") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "2.1") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0.2") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), Seq(Row(300000)))
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE, "false")
@@ -411,10 +418,32 @@ class CarbonIndexFileMergeTestCase
         | TBLPROPERTIES('SORT_COLUMNS'='city,name')
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE partitionTable OPTIONS('header'='false')")
-    assert(getIndexFileCount("default_partitionTable", "0") == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_partitionTable", "0") == 0)
     sql("DROP TABLE IF EXISTS partitionTable")
   }
 
+  test("Verify command of index merge for partition table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING)
+        | PARTITIONED BY(age INT)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge " +
+      s"partition(age='20') OPTIONS('header'='false')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0") == 1)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+    assert(CarbonTestUtil.getIndexFileCount("default_nonindexmerge", "0",
+      CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+  }
+
   test("Verify index merge for streaming table") {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -428,7 +457,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
     // check for one merge index file
     assert(
-      getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+      CarbonTestUtil.getIndexFileCount("default_streamingTable",
+        "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
     sql("DROP TABLE IF EXISTS streamingTable")
   }
 
@@ -444,15 +474,15 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
     // check for zero merge index file
-    assert(
-      getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+      "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
     // check for one index file
-    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable", "0") == 1)
     sql("alter table streamingTable compact 'segment_index'")
     sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
     // check for one merge index file
-    assert(
-      getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+      "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
     sql("DROP TABLE IF EXISTS streamingTable")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -470,14 +500,14 @@ class CarbonIndexFileMergeTestCase
       """.stripMargin)
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE streamingTable OPTIONS('header'='false')")
     // check for zero merge index file
-    assert(
-      getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+      "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 0)
     // check for one index file
-    assert(getIndexFileCount("default_streamingTable", "0", CarbonTablePath.INDEX_FILE_EXT) == 1)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable", "0") == 1)
     sql("alter table streamingTable compact 'segment_index' where segment.id in (0)")
     // check for one merge index file
-    assert(
-      getIndexFileCount("default_streamingTable", "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
+    assert(CarbonTestUtil.getIndexFileCount("default_streamingTable",
+      "0", CarbonTablePath.MERGE_INDEX_FILE_EXT) == 1)
     sql("DROP TABLE IF EXISTS streamingTable")
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
@@ -506,6 +536,60 @@ class CarbonIndexFileMergeTestCase
     assert(!mergeFileNameIsNull("0", "default", "merge_index_cache"))
   }
 
+  test("verify load when merge index fails") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    val mockMethod = new MockUp[CarbonIndexFileMergeWriter]() {
+      @Mock
+      def writeMergeIndexFileBasedOnSegmentFolder
+      (indexFileNamesTobeAdded: util.List[String], isOldStoreIndexFilesPresent: Boolean,
+       segmentPath: String, segmentId: String, uuid: String, readBasedOnUUID: Boolean): String = {
+        throw new IOException("mock failure reason")
+      }
+    }
+    sql("DROP TABLE IF EXISTS indexmerge")
+    sql(
+      """
+        | CREATE TABLE indexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED AS carbondata
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    intercept[RuntimeException] {
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false')")
+    }
+    checkAnswer(sql("Select count(*) from indexmerge"), Seq(Row(0)))
+    sql("DROP TABLE indexmerge")
+    mockMethod.tearDown()
+  }
+
+  test("verify load when merge index fails for partition table") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
+    val mockMethod = new MockUp[CarbonLoaderUtil]() {
+      @Mock
+      def mergeIndexFilesInPartitionedTempSegment
+      (table: CarbonTable, segmentId: String, partitionPath: String,
+          partitionInfo: util.List[String], uuid: String, tempFolderPath: String,
+          currPartitionSpec: String): SegmentFileStore.FolderDetails = {
+        throw new IOException("mock failure reason")
+      }
+    }
+    sql("DROP TABLE IF EXISTS indexmergePartition")
+    sql(
+      """
+        | CREATE TABLE indexmergePartition(id INT, name STRING, city STRING)
+        | STORED AS carbondata partitioned by(age INT)
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name')
+      """.stripMargin)
+    intercept[RuntimeException] {
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmergePartition " +
+          s"OPTIONS('header'='false')")
+    }
+    checkAnswer(sql("Select count(*) from indexmergePartition"), Seq(Row(0)))
+    sql("DROP TABLE indexmergePartition")
+    mockMethod.tearDown()
+  }
+
   private def mergeFileNameIsNull(segmentId: String, dbName: String, tableName: String): Boolean = {
     val carbonTable = CarbonEnv.getCarbonTable(Option(dbName), tableName)(sqlContext.sparkSession)
     val indexFactory = IndexStoreManager.getInstance().getDefaultIndex(carbonTable)
@@ -520,33 +604,6 @@ class CarbonIndexFileMergeTestCase
     identifiers.forall(identifier => identifier.getMergeIndexFileName == null)
   }
 
-  private def getIndexFileCount(tableName: String,
-      segment: String,
-      extension: String = CarbonTablePath.INDEX_FILE_EXT): Int = {
-    val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
-    val path = CarbonTablePath
-      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
-    val carbonFiles = if (table.isHivePartitionTable) {
-      FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
-        .listFiles(true, new CarbonFileFilter {
-          override def accept(file: CarbonFile): Boolean = {
-            file.getName.endsWith(extension)
-          }
-        })
-    } else {
-      FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
-        override def accept(file: CarbonFile): Boolean = {
-          file.getName.endsWith(extension)
-        }
-      })
-    }
-    if (carbonFiles != null) {
-      carbonFiles.size()
-    } else {
-      0
-    }
-  }
-
   private def getIndexOrMergeIndexFileSize(carbonTable: CarbonTable,
       segmentId: String,
       fileExtension: String): Long = {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index c418e49..4ad3167 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -31,7 +31,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.CarbonMetadata
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -84,13 +84,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     )
     sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'e'""").collect()
     sql("clean files for table iud.zerorows options('force'='true')")
-    val carbonTable = CarbonEnv.getCarbonTable(Some("iud"), "zerorows")(sqlContext.sparkSession)
-    val segmentPath = FileFactory.getCarbonFile(CarbonTablePath.getSegmentPath(carbonTable
-      .getTablePath, "2"))
-    assert(!segmentPath.exists())
-    val segmentFileLocation = FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(
-      carbonTable.getTablePath))
-    assert(segmentFileLocation.listFiles().length == 3)
+    assert(CarbonTestUtil.getSegmentFileCount("iud_zerorows") == 3)
     CarbonProperties.getInstance().addProperty("carbon.clean.file.force.allowed", "true")
     sql("""drop table iud.zerorows""")
   }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2efd56b..9867099 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -196,11 +196,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
       // mergeIndex is true, the segment file not need to be written
       // and will be written during merging index
       if (partitionSpec != null && !isMergeIndex) {
+        // By default carbon.merge.index.in.segment is true and this code will be used for
+        // developer debugging purpose.
         try {
-          SegmentFileStore
-              .writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(),
-                  partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
-                  partitionSpec.getPartitions());
+          SegmentFileStore.writeSegmentFileForPartitionTable(carbonLoadModel.getTablePath(),
+              carbonLoadModel.getTaskNo(), partitionSpec.getLocation().toString(),
+              carbonLoadModel.getFactTimeStamp() + "", partitionSpec.getPartitions());
         } catch (IOException e) {
           throw e;
         }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 6d8cae7..6464306 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -192,9 +192,11 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
         // mergeIndex is true, the segment file not need to be written
         // and will be written during merging index
         if (partitionSpec != null && !isMergeIndex) {
-          SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
-              partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
-              partitionSpec.getPartitions());
+          // By default carbon.merge.index.in.segment is true and this code will be used for
+          // developer debugging purpose.
+          SegmentFileStore.writeSegmentFileForPartitionTable(loadModel.getTablePath(),
+              loadModel.getTaskNo(), partitionSpec.getLocation().toString(),
+              loadModel.getFactTimeStamp() + "", partitionSpec.getPartitions());
         }
       } catch (CarbonDataWriterException | IOException e) {
         mergeStatus = false;
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1d35502..919742d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1148,10 +1148,11 @@ public final class CarbonLoaderUtil {
    * @return
    */
   public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, String segmentId,
-      String uuid, String partitionPath) {
+      String uuid, String partitionPath, boolean isOldStoreIndexFilesPresent) {
     String tablePath = table.getTablePath();
     return new CarbonIndexFileMergeWriter(table)
-        .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath);
+        .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath,
+            isOldStoreIndexFilesPresent);
   }
 
   public static SegmentFileStore.FolderDetails mergeIndexFilesInPartitionedTempSegment(
@@ -1163,6 +1164,23 @@ public final class CarbonLoaderUtil {
             tempFolderPath, currPartitionSpec);
   }
 
+  /**
+   * Merge index files of segment with external path.
+   */
+  public static String mergeIndexFilesInAddLoadSegment(CarbonTable table, String segmentId,
+      String segmentPath, String uuid) {
+    try {
+      return new CarbonIndexFileMergeWriter(table)
+          .writeMergeIndexFileBasedOnSegmentFolder(null, false, segmentPath, segmentId, uuid,
+              false);
+    } catch (IOException e) {
+      String message =
+          "Failed to merge index files in path: " + segmentPath + ". " + e.getMessage();
+      LOGGER.error(message);
+      throw new RuntimeException(message, e);
+    }
+  }
+
   private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
     for (String filePath : filesToBeDeleted) {
       FileFactory.deleteFile(filePath);