You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/06/17 15:48:34 UTC

[carbondata] branch master updated: [CARBONDATA-3415] Merge index is not working for partition table. Merge index for partition table is taking significantly longer time than normal table.

This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 4bead58  [CARBONDATA-3415] Merge index is not working for partition table. Merge index for partition table is taking significantly longer time than normal table.
4bead58 is described below

commit 4bead5893ec6ae0d2d7ee7a9eb05d421090b3174
Author: dhatchayani <dh...@gmail.com>
AuthorDate: Thu Jun 6 16:04:05 2019 +0530

    [CARBONDATA-3415] Merge index is not working for partition table. Merge index for partition table is taking significantly longer time than normal table.
    
    Problem:
    (1) Merge index is not working for partition table.
    (2) Merge index for partition table is significantly more than the normal carbon table.
    
    Root cause:
    (1) Merge index event listener is moved to preStatusUpdateEvent in #3221 . But preStatusUpdateEvent is not triggered in case of partition table. Test case to validate merge index on partition table is also wrong. Not caught in the test builders.
    (2) Currently, merge index job will trigger tasks like one segment one task. But for a partition table, there are partitions in a segments and merge index is for partitions. So per segment it has to iterate and merge the index files inside partitions, because of this the time is little more when the number of partitions are high. Number of Tasks = Number of Segments.
    
    Solution:
    (1) Correct the test case and trigger merge index listener for partition table.
    (2) Parallelize the tasks launched to the partitions. Number of tasks = Number of partitions in a segment
    
    This closes #3262
---
 .../blockletindex/SegmentIndexFileStore.java       |  2 +-
 .../carbondata/core/metadata/SegmentFileStore.java | 47 ++++++++++++----
 .../core/writer/CarbonIndexFileMergeWriter.java    | 64 ++++++++++++++++------
 .../hadoop/api/CarbonOutputCommitter.java          | 11 +++-
 .../CarbonIndexFileMergeTestCase.scala             | 24 +++++---
 .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala | 58 ++++++++++++++++++--
 .../spark/rdd/CarbonTableCompactor.scala           |  9 +--
 .../command/management/CarbonLoadDataCommand.scala |  3 +-
 .../org/apache/spark/util/MergeIndexUtil.scala     | 10 +++-
 .../processing/util/CarbonLoaderUtil.java          |  4 +-
 10 files changed, 182 insertions(+), 50 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 4351f3a..fc30861 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -296,7 +296,7 @@ public class SegmentIndexFileStore {
    * @param indexFile
    * @throws IOException
    */
-  private void readIndexFile(CarbonFile indexFile) throws IOException {
+  public void readIndexFile(CarbonFile indexFile) throws IOException {
     String indexFilePath = indexFile.getCanonicalPath();
     DataInputStream dataInputStream = FileFactory
         .getDataInputStream(indexFilePath, FileFactory.getFileType(indexFilePath), configuration);
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index cbf58c7..2b02ea4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -80,20 +80,36 @@ public class SegmentFileStore {
    */
   public static void writeSegmentFile(String tablePath, final String taskNo, String location,
       String timeStamp, List<String> partionNames) throws IOException {
+    writeSegmentFile(tablePath, taskNo, location, timeStamp, partionNames, false);
+  }
+
+  /**
+   * Write segment information to the segment folder with indexfilename and
+   * corresponding partitions.
+   */
+  public static void writeSegmentFile(String tablePath, final String taskNo, String location,
+      String timeStamp, List<String> partionNames, boolean isMergeIndexFlow) throws IOException {
     String tempFolderLoc = timeStamp + ".tmp";
     String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath) + "/" + tempFolderLoc;
     CarbonFile carbonFile = FileFactory.getCarbonFile(writePath);
     if (!carbonFile.exists()) {
       carbonFile.mkdirs(writePath);
     }
-    CarbonFile tempFolder =
-        FileFactory.getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+    CarbonFile tempFolder = null;
+    if (isMergeIndexFlow) {
+      tempFolder = FileFactory.getCarbonFile(location);
+    } else {
+      tempFolder = FileFactory
+          .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + tempFolderLoc);
+    }
 
-    if (tempFolder.exists() && partionNames.size() > 0) {
+    if ((tempFolder.exists() && partionNames.size() > 0) || (isMergeIndexFlow
+        && partionNames.size() > 0)) {
       CarbonFile[] carbonFiles = tempFolder.listFiles(new CarbonFileFilter() {
         @Override public boolean accept(CarbonFile file) {
-          return file.getName().startsWith(taskNo) && file.getName()
-              .endsWith(CarbonTablePath.INDEX_FILE_EXT);
+          return file.getName().startsWith(taskNo) && (
+              file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT) || file.getName()
+                  .endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT));
         }
       });
       if (carbonFiles != null && carbonFiles.length > 0) {
@@ -108,10 +124,22 @@ public class SegmentFileStore {
         folderDetails.setPartitions(partionNames);
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
         for (CarbonFile file : carbonFiles) {
-          folderDetails.getFiles().add(file.getName());
+          if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+            folderDetails.setMergeFileName(file.getName());
+          } else {
+            folderDetails.getFiles().add(file.getName());
+          }
         }
         segmentFile.addPath(location, folderDetails);
-        String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+        String path = null;
+        if (isMergeIndexFlow) {
+          // in case of merge index flow, tasks are launched per partition and all the tasks
+          // will be writting to the same tmp folder, in that case taskNo is not unique.
+          // To generate a unique fileName UUID is used
+          path = writePath + "/" + CarbonUtil.generateUUID() + CarbonTablePath.SEGMENT_EXT;
+        } else {
+          path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
+        }
         // write segment info to new file.
         writeSegmentFile(segmentFile, path);
       }
@@ -909,11 +937,10 @@ public class SegmentFileStore {
    * @return
    * @throws IOException
    */
-  public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath)
+  public static List<PartitionSpec> getPartitionSpecs(String segmentId, String tablePath,
+      LoadMetadataDetails[] details)
       throws IOException {
     LoadMetadataDetails segEntry = null;
-    LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
     for (LoadMetadataDetails entry : details) {
       if (entry.getLoadName().equals(segmentId)) {
         segEntry = entry;
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 80e0af5..c9d4c26 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -31,10 +31,12 @@ import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
@@ -72,7 +74,7 @@ public class CarbonIndexFileMergeWriter {
    */
   private String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath, List<String> indexFileNamesTobeAdded,
-      boolean readFileFooterFromCarbonDataFile, String uuid) {
+      boolean readFileFooterFromCarbonDataFile, String uuid, String partitionPath) {
     try {
       Segment segment = Segment.getSegment(segmentId, tablePath);
       String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
@@ -81,7 +83,18 @@ public class CarbonIndexFileMergeWriter {
       if (segment != null && segment.getSegmentFileName() != null) {
         sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
         List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
-        indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+        if (table.isHivePartitionTable()) {
+          // in case of partition table, merge index files of a partition
+          List<CarbonFile> indexFilesInPartition = new ArrayList<>();
+          for (CarbonFile indexCarbonFile : indexCarbonFiles) {
+            if (indexCarbonFile.getParentFile().getPath().equals(partitionPath)) {
+              indexFilesInPartition.add(indexCarbonFile);
+            }
+          }
+          indexFiles = indexFilesInPartition.toArray(new CarbonFile[indexFilesInPartition.size()]);
+        } else {
+          indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+        }
       } else {
         indexFiles =
             SegmentIndexFileStore.getCarbonIndexFiles(segmentPath, FileFactory.getConfiguration());
@@ -92,7 +105,7 @@ public class CarbonIndexFileMergeWriter {
               readFileFooterFromCarbonDataFile, segmentPath, indexFiles, segmentId);
         } else {
           return writeMergeIndexFileBasedOnSegmentFile(segmentId, indexFileNamesTobeAdded, sfs,
-              indexFiles, uuid);
+              indexFiles, uuid, partitionPath);
         }
       }
     } catch (Exception e) {
@@ -124,11 +137,17 @@ public class CarbonIndexFileMergeWriter {
 
   private String writeMergeIndexFileBasedOnSegmentFile(String segmentId,
       List<String> indexFileNamesTobeAdded, SegmentFileStore segmentFileStore,
-      CarbonFile[] indexFiles, String uuid) throws IOException {
+      CarbonFile[] indexFiles, String uuid, String partitionPath) throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-    fileStore
-        .readAllIIndexOfSegment(segmentFileStore.getSegmentFile(), segmentFileStore.getTablePath(),
-            SegmentStatus.SUCCESS, true);
+    // in case of partition table, merge index file to be created for each partition
+    if (null != partitionPath) {
+      for (CarbonFile indexFile : indexFiles) {
+        fileStore.readIndexFile(indexFile);
+      }
+    } else {
+      fileStore.readAllIIndexOfSegment(segmentFileStore.getSegmentFile(),
+          segmentFileStore.getTablePath(), SegmentStatus.SUCCESS, true);
+    }
     Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
     Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
     for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
@@ -140,6 +159,9 @@ public class CarbonIndexFileMergeWriter {
       }
       map.put(path.getName(), entry.getValue());
     }
+    List<PartitionSpec> partitionSpecs = SegmentFileStore
+        .getPartitionSpecs(segmentId, table.getTablePath(), SegmentStatusManager
+            .readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath())));
     for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
       String mergeIndexFile =
           writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue(), segmentId);
@@ -156,14 +178,24 @@ public class CarbonIndexFileMergeWriter {
           break;
         }
       }
+      if (table.isHivePartitionTable()) {
+        for (PartitionSpec partitionSpec : partitionSpecs) {
+          if (partitionSpec.getLocation().toString().equals(partitionPath)) {
+            SegmentFileStore.writeSegmentFile(table.getTablePath(), mergeIndexFile, partitionPath,
+                segmentId + "_" + uuid + "", partitionSpec.getPartitions(), true);
+          }
+        }
+      }
     }
     String newSegmentFileName = SegmentFileStore.genSegmentFileName(segmentId, uuid)
         + CarbonTablePath.SEGMENT_EXT;
     String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
         + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
-    SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
-    SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName,
-        table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
+    if (!table.isHivePartitionTable()) {
+      SegmentFileStore.writeSegmentFile(segmentFileStore.getSegmentFile(), path);
+      SegmentFileStore.updateSegmentFile(table, segmentId, newSegmentFileName,
+          table.getCarbonTableIdentifier().getTableId(), segmentFileStore);
+    }
 
     for (CarbonFile file : indexFiles) {
       file.delete();
@@ -205,9 +237,9 @@ public class CarbonIndexFileMergeWriter {
    * @param segmentId
    * @throws IOException
    */
-  public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid,
-      String tablePath) throws IOException {
-    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid);
+  public String mergeCarbonIndexFilesOfSegment(String segmentId, String uuid, String tablePath,
+      String partitionPath) throws IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false, uuid, partitionPath);
   }
 
   /**
@@ -217,10 +249,10 @@ public class CarbonIndexFileMergeWriter {
    * @param readFileFooterFromCarbonDataFile
    * @throws IOException
    */
-  public String mergeCarbonIndexFilesOfSegment(String segmentId,
-      String tablePath, boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException {
+  public String mergeCarbonIndexFilesOfSegment(String segmentId, String tablePath,
+      boolean readFileFooterFromCarbonDataFile, String uuid) throws IOException {
     return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
-        readFileFooterFromCarbonDataFile, uuid);
+        readFileFooterFromCarbonDataFile, uuid, null);
   }
 
   private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index cda8b7a..02135e2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -134,19 +134,26 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       newMetaEntry.setSegmentFile(segmentFileName + CarbonTablePath.SEGMENT_EXT);
     }
     OperationContext operationContext = (OperationContext) getOperationContext();
+    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     String uuid = "";
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() &&
         operationContext != null) {
       uuid = operationContext.getProperty("uuid").toString();
     }
+
+    SegmentFileStore.updateSegmentFile(carbonTable, loadModel.getSegmentId(),
+        segmentFileName + CarbonTablePath.SEGMENT_EXT,
+        carbonTable.getCarbonTableIdentifier().getTableId(),
+        new SegmentFileStore(carbonTable.getTablePath(),
+            segmentFileName + CarbonTablePath.SEGMENT_EXT));
+
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newMetaEntry, SegmentStatus.SUCCESS, loadModel.getFactTimeStamp(),
             true);
-    CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     long segmentSize = CarbonLoaderUtil
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
     if (segmentSize > 0 || overwriteSet) {
-      if (operationContext != null && carbonTable.hasAggregationDataMap()) {
+      if (operationContext != null) {
         operationContext
             .setProperty("current.segmentfile", newMetaEntry.getSegmentFile());
         LoadEvents.LoadTablePreStatusUpdateEvent event =
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index c9a7971..7c3e107 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -493,14 +493,24 @@ class CarbonIndexFileMergeTestCase
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath
       .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
-    val carbonFiles = FileFactory.getCarbonFile(path).listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = {
-        file.getName.endsWith(CarbonTablePath
-          .INDEX_FILE_EXT)
-      }
-    })
+    val carbonFiles = if (table.isHivePartitionTable) {
+      FileFactory.getCarbonFile(table.getAbsoluteTableIdentifier.getTablePath)
+        .listFiles(true, new CarbonFileFilter {
+          override def accept(file: CarbonFile): Boolean = {
+            file.getName.endsWith(CarbonTablePath
+              .INDEX_FILE_EXT)
+          }
+        })
+    } else {
+      FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = {
+          file.getName.endsWith(CarbonTablePath
+            .INDEX_FILE_EXT)
+        }
+      })
+    }
     if (carbonFiles != null) {
-      carbonFiles.length
+      carbonFiles.size()
     } else {
       0
     }
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index bb930b4..9bed7f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -17,20 +17,28 @@
 
 package org.apache.spark.rdd
 
+import java.util
+
+import scala.collection.JavaConverters._
+
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
 import org.apache.carbondata.spark.rdd.CarbonRDD
 
-case class CarbonMergeFilePartition(rddId: Int, idx: Int, segmentId: String)
-  extends Partition {
+case class CarbonMergeFilePartition(rddId: Int,
+    idx: Int,
+    segmentId: String,
+    partitionPath: String = null) extends Partition {
 
   override val index: Int = idx
 
@@ -94,6 +102,20 @@ object CarbonMergeFilesRDD {
           }
       }
     }
+    if (carbonTable.isHivePartitionTable) {
+      segmentIds.foreach(segmentId => {
+        val readPath: String = CarbonTablePath.getSegmentFilesLocation(tablePath) +
+                               CarbonCommonConstants.FILE_SEPARATOR + segmentId + "_" +
+                               segmentFileNameToSegmentIdMap.get(segmentId) + ".tmp"
+        // Merge all partition files into a single file.
+        val segmentFileName: String = SegmentFileStore
+          .genSegmentFileName(segmentId, segmentFileNameToSegmentIdMap.get(segmentId))
+        SegmentFileStore
+          .mergeSegmentFiles(readPath,
+            segmentFileName,
+            CarbonTablePath.getSegmentFilesLocation(tablePath))
+      })
+    }
   }
 
   /**
@@ -130,9 +152,33 @@ class CarbonMergeFilesRDD(
   extends CarbonRDD[String](ss, Nil) {
 
   override def internalGetPartitions: Array[Partition] = {
-    segments.zipWithIndex.map {s =>
-      CarbonMergeFilePartition(id, s._2, s._1)
-    }.toArray
+    if (isHivePartitionedTable) {
+      val metadataDetails = SegmentStatusManager
+        .readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
+      // in case of partition table make rdd partitions per partition of the carbon table
+      val partitionPaths: java.util.Map[String, java.util.List[String]] = new java.util.HashMap()
+      segments.foreach(segment => {
+        val partitionSpecs = SegmentFileStore
+          .getPartitionSpecs(segment, carbonTable.getTablePath, metadataDetails)
+          .asScala.map(_.getLocation.toString)
+        partitionPaths.put(segment, partitionSpecs.asJava)
+      })
+      var index: Int = -1
+      val rddPartitions: java.util.List[Partition] = new java.util.ArrayList()
+      partitionPaths.asScala.foreach(partitionPath => {
+        val segmentId = partitionPath._1
+        partitionPath._2.asScala.map { partition =>
+          index = index + 1
+          rddPartitions.add(CarbonMergeFilePartition(id, index, segmentId, partition))
+        }
+      })
+      rddPartitions.asScala.toArray
+    } else {
+      // in case of normal carbon table, make rdd partitions per segment
+      segments.zipWithIndex.map { s =>
+        CarbonMergeFilePartition(id, s._2, s._1)
+      }.toArray
+    }
   }
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
@@ -145,7 +191,7 @@ class CarbonMergeFilesRDD(
       if (isHivePartitionedTable) {
         CarbonLoaderUtil
           .mergeIndexFilesInPartitionedSegment(carbonTable, split.segmentId,
-            segmentFileNameToSegmentIdMap.get(split.segmentId))
+            segmentFileNameToSegmentIdMap.get(split.segmentId), split.partitionPath)
       } else {
         new CarbonIndexFileMergeWriter(carbonTable)
           .mergeCarbonIndexFilesOfSegment(split.segmentId,
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 4c7dd95..77b7119 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -214,10 +214,6 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
       var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
       var segmentFileName: String = null
-      if (compactionType != CompactionType.IUD_DELETE_DELTA &&
-          compactionType != CompactionType.IUD_UPDDEL_DELTA) {
-        MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
-      }
       if (carbonTable.isHivePartitionTable) {
         val readPath =
           CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
@@ -283,6 +279,11 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
           compactionType,
           segmentFileName)
 
+      if (compactionType != CompactionType.IUD_DELETE_DELTA &&
+          compactionType != CompactionType.IUD_UPDDEL_DELTA) {
+        MergeIndexUtil.mergeIndexFilesOnCompaction(compactionCallableModel)
+      }
+
       val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(sc.sparkSession,
         carbonTable,
         carbonMergerMapping,
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 5225360..390de33 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -886,7 +886,8 @@ case class CarbonLoadDataCommand(
           "load is passed.", e)
     }
     val specs =
-      SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath)
+      SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath,
+        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath)))
     if (specs != null) {
       specs.asScala.map{ spec =>
         Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
index f109b2e..85a22cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/MergeIndexUtil.scala
@@ -52,9 +52,17 @@ object MergeIndexUtil {
                      CarbonCommonConstants.LOAD_FOLDER.length)
         mergedSegmentIds.add(loadName)
       })
+      val loadFolderDetailsArray = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+      val segmentFileNameMap: java.util.Map[String, String] = new util.HashMap[String, String]()
+      loadFolderDetailsArray.foreach(loadMetadataDetails => {
+        segmentFileNameMap
+          .put(loadMetadataDetails.getLoadName,
+            String.valueOf(loadMetadataDetails.getLoadStartTime))
+      })
       CarbonMergeFilesRDD.mergeIndexFiles(sparkSession,
         mergedSegmentIds.asScala,
-        new util.HashMap[String, String](),
+        segmentFileNameMap,
         carbonTable.getTablePath,
         carbonTable, false)
     }
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index fe91099..8aafed9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1182,10 +1182,10 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static String mergeIndexFilesInPartitionedSegment(CarbonTable table, String segmentId,
-      String uuid) throws IOException {
+      String uuid, String partitionPath) throws IOException {
     String tablePath = table.getTablePath();
     return new CarbonIndexFileMergeWriter(table)
-        .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath);
+        .mergeCarbonIndexFilesOfSegment(segmentId, uuid, tablePath, partitionPath);
   }
 
   private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {