You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/30 03:19:04 UTC

[1/2] carbondata git commit: [CARBONDATA-2270] Write segment file in loading for non-partition table

Repository: carbondata
Updated Branches:
  refs/heads/master e43be5e74 -> 7e0803fec


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a987127..15ae30f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * This class is used to perform compaction on carbon table.
@@ -201,6 +200,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     if (finalMergeStatus) {
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+      var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
       var segmentFileName: String = null
       if (carbonTable.isHivePartitionTable) {
         val readPath =
@@ -220,6 +220,23 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
               carbonLoadModel.getTablePath)
         }
         segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
+      } else {
+        // Get the segment files each updated segment in case of IUD compaction
+        if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
+          val segmentFilesList = loadsToMerge.asScala.map{seg =>
+            val file = SegmentFileStore.writeSegmentFile(
+              carbonTable.getTablePath,
+              seg.getLoadName,
+              carbonLoadModel.getFactTimeStamp.toString)
+            new Segment(seg.getLoadName, file)
+          }.filter(_.getSegmentFileName != null).asJava
+          segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
+        } else {
+          segmentFileName = SegmentFileStore.writeSegmentFile(
+            carbonTable.getTablePath,
+            mergedLoadNumber,
+            carbonLoadModel.getFactTimeStamp.toString)
+        }
       }
       // trigger event for compaction
       val alterTableCompactionPreStatusUpdateEvent =
@@ -238,11 +255,12 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
          CarbonDataMergerUtil
            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
              carbonTable.getMetadataPath,
-             carbonLoadModel)) ||
+             carbonLoadModel,
+             segmentFilesForIUDCompact)) ||
         CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
           loadsToMerge,
-            carbonTable.getMetadataPath,
-            mergedLoadNumber,
+          carbonTable.getMetadataPath,
+          mergedLoadNumber,
           carbonLoadModel,
           compactionType,
           segmentFileName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 2d19fd4..b27a150 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -51,14 +51,14 @@ case class CarbonCountStar(
     val rowCount = CarbonUpdateUtil.getRowCount(
       tableInputFormat.getBlockRowCount(
         job,
-        absoluteTableIdentifier,
+        carbonTable,
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
           TableIdentifier(
             carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
-      absoluteTableIdentifier)
+      carbonTable)
     val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
     val row = if (outUnsafeRows) unsafeProjection(value) else value

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 8eaeab1..0c6d2ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -105,12 +105,12 @@ object DeleteExecution {
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
         job,
-        absoluteTableIdentifier,
+        carbonTable,
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
           TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
-    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index f88e767..8c88d0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -73,7 +73,7 @@ object HorizontalCompaction {
     // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
     // file and save the content in segmentDetails and updateDetails respectively.
     val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      absTableIdentifier)
+      carbonTable)
 
     if (isUpdateOperation) {
 
@@ -199,7 +199,7 @@ object HorizontalCompaction {
               .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
 
             val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
-              absTableIdentifier,
+              carbonTable,
               updateStatusDetails,
               timestamp)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b583c6a..25a0d8e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -122,7 +122,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
         val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
         val segmentFileName =
-          loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
+          SegmentFileStore.genSegmentFileName(
+            loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
+          CarbonTablePath.SEGMENT_EXT
         newMetaEntry.setSegmentFile(segmentFileName)
         val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
         CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 9c2835e..756bc97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -210,9 +210,7 @@ case class CarbonAlterTableDropPartitionCommand(
       for (thread <- threadArray) {
         thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
       refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 1bdf414..929de0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -231,9 +231,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       threadArray.foreach {
         thread => thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
       refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 47da9a5..7123b93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -96,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   private def driverSideCountStar(logicalRelation: LogicalRelation): Boolean = {
     val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val segmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      relation.carbonRelation.metaData.carbonTable.getAbsoluteTableIdentifier)
+      relation.carbonRelation.metaData.carbonTable)
     val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata()
     if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) {
       false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index ea5eb42..5bc85f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -141,7 +141,7 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
       List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
-      CarbonLoadModel carbonLoadModel) {
+      CarbonLoadModel carbonLoadModel, List<Segment> segmentFilesToBeUpdated) {
 
     boolean status = false;
     boolean updateLockStatus = false;
@@ -171,7 +171,7 @@ public final class CarbonDataMergerUtil {
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(identifier);
+        new SegmentUpdateStatusManager(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
 
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
@@ -230,6 +230,13 @@ public final class CarbonDataMergerUtil {
                 loadDetail
                     .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
               }
+              // Update segement file name to status file
+              int segmentFileIndex =
+                  segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName()));
+              if (segmentFileIndex > -1) {
+                loadDetail.setSegmentFile(
+                    segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
+              }
             }
           }
 
@@ -1135,18 +1142,17 @@ public final class CarbonDataMergerUtil {
    *
    * @param seg
    * @param blockName
-   * @param absoluteTableIdentifier
    * @param segmentUpdateDetails
    * @param timestamp
    * @return
    * @throws IOException
    */
   public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
-      String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+      String blockName, CarbonTable table,
       SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(table);
 
     List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 65827b0..aabe91a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -334,60 +333,6 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  /**
-   * This API will update the segmentFile of a passed segment.
-   *
-   * @return boolean which determines whether status update is done or not.
-   * @throws IOException
-   */
-  private static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
-      throws IOException {
-    boolean status = false;
-    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
-    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.from(tablePath, null, null);
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-    int retryCount = CarbonLockUtil
-        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
-    int maxTimeout = CarbonLockUtil
-        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
-    try {
-      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
-        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metadataPath);
-
-        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
-          // if the segments is in the list of marked for delete then update the status.
-          if (segmentId.equals(detail.getLoadName())) {
-            detail.setSegmentFile(segmentFile);
-            break;
-          }
-        }
-
-        SegmentStatusManager
-            .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
-        status = true;
-      } else {
-        LOGGER.error(
-            "Not able to acquire the lock for Table status updation for table path " + tablePath);
-      }
-      ;
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + tablePath + " during table status updation");
-      }
-    }
-    return status;
-  }
-
   private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
     String path = CarbonTablePath.getSegmentPath(
@@ -1102,26 +1047,15 @@ public final class CarbonLoaderUtil {
   /**
    * Merge index files with in the segment of partitioned table
    * @param segmentId
-   * @param tablePath
+   * @param table
    * @return
    * @throws IOException
    */
-  public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath)
+  public static String mergeIndexFilesinPartitionedSegment(String segmentId, CarbonTable table)
       throws IOException {
-    CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
-        new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
-    String uniqueId = "";
-    if (segmentIndexFIleMergeStatus != null) {
-      uniqueId = System.currentTimeMillis() + "";
-      String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
-      String path =
-          CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
-              + newSegmentFileName;
-      SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path);
-      updateSegmentFile(tablePath, segmentId, newSegmentFileName);
-      deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
-    }
-    return uniqueId;
+    String tablePath = table.getTablePath();
+    return new CarbonIndexFileMergeWriter(table)
+        .mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
   }
 
   private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {


[2/2] carbondata git commit: [CARBONDATA-2270] Write segment file in loading for non-partition table

Posted by ja...@apache.org.
[CARBONDATA-2270] Write segment file in loading for non-partition table

Currently when loading into partition table, carbon is writing a segment file to record the segment and index file location mapping.
This can avoid frequent listFile operation when querying. The same should be done for non-partition table also.

This closes #2092


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7e0803fe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7e0803fe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7e0803fe

Branch: refs/heads/master
Commit: 7e0803fec02f5872569e7680ce4d3ab02507285b
Parents: e43be5e
Author: Jacky Li <ja...@qq.com>
Authored: Fri Mar 23 12:13:11 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 30 11:18:42 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/DataMapChooser.java |   5 +-
 .../core/datamap/DataMapStoreManager.java       |  61 +++----
 .../core/datamap/IndexDataMapProvider.java      |   3 +-
 .../core/datastore/SegmentTaskIndexStore.java   |  14 +-
 .../core/metadata/SegmentFileStore.java         | 172 +++++++++++++++++--
 .../core/mutate/CarbonUpdateUtil.java           |  70 ++++++--
 .../SegmentUpdateStatusManager.java             |  72 ++++----
 .../core/writer/CarbonIndexFileMergeWriter.java |  46 +++--
 .../hadoop/api/CarbonFileInputFormat.java       |   7 +-
 .../hadoop/api/CarbonInputFormat.java           |   6 +-
 .../hadoop/api/CarbonOutputCommitter.java       |   3 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  32 ++--
 .../hadoop/api/DistributableDataMapFormat.java  |  10 +-
 .../sdv/generated/MergeIndexTestCase.scala      |   8 +-
 .../CarbonIndexFileMergeTestCase.scala          |  14 +-
 .../iud/DeleteCarbonTableTestCase.scala         |  10 +-
 .../iud/UpdateCarbonTableTestCase.scala         |   8 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   4 +-
 .../org/apache/spark/util/PartitionUtils.scala  |  21 ++-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  29 +++-
 .../spark/rdd/CarbonTableCompactor.scala        |  26 ++-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   4 +-
 .../command/mutation/DeleteExecution.scala      |   4 +-
 .../command/mutation/HorizontalCompaction.scala |   4 +-
 ...arbonAlterTableAddHivePartitionCommand.scala |   4 +-
 .../CarbonAlterTableDropPartitionCommand.scala  |   4 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |   4 +-
 .../strategy/CarbonLateDecodeStrategy.scala     |   2 +-
 .../processing/merger/CarbonDataMergerUtil.java |  16 +-
 .../processing/util/CarbonLoaderUtil.java       |  76 +-------
 30 files changed, 458 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
index f9214a8..ac00e71 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapChooser.java
@@ -87,8 +87,9 @@ public class DataMapChooser {
       }
     }
     // Return the default datamap if no other datamap exists.
-    return new DataMapExprWrapperImpl(DataMapStoreManager.getInstance()
-        .getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()), resolverIntf);
+    return new DataMapExprWrapperImpl(
+        DataMapStoreManager.getInstance().getDefaultDataMap(carbonTable),
+        resolverIntf);
   }
 
   private ExpressionTuple selectDataMap(Expression expression, List<TableDataMap> allDataMap) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
index d01df4f..b0aff0a 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapStoreManager.java
@@ -102,7 +102,7 @@ public final class DataMapStoreManager {
         if (dataMapSchema.isIndexDataMap() && identifier.getTableName()
             .equals(carbonTable.getTableName()) && identifier.getDatabaseName()
             .equals(carbonTable.getDatabaseName())) {
-          dataMaps.add(getDataMap(carbonTable.getAbsoluteTableIdentifier(), dataMapSchema));
+          dataMaps.add(getDataMap(carbonTable, dataMapSchema));
         }
       }
     }
@@ -187,32 +187,33 @@ public final class DataMapStoreManager {
   /**
    * It gives the default datamap of the table. Default datamap of any table is BlockletDataMap
    *
-   * @param identifier
+   * @param table
    * @return
    */
-  public TableDataMap getDefaultDataMap(AbsoluteTableIdentifier identifier) {
-    return getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+  public TableDataMap getDefaultDataMap(CarbonTable table) {
+    return getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
   }
 
   /**
    * Get the datamap for reading data.
    */
-  public TableDataMap getDataMap(AbsoluteTableIdentifier identifier, DataMapSchema dataMapSchema) {
-    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
-    List<TableDataMap> tableIndices = allDataMaps.get(table);
+  public TableDataMap getDataMap(CarbonTable table, DataMapSchema dataMapSchema) {
+    String tableUniqueName =
+        table.getAbsoluteTableIdentifier().getCarbonTableIdentifier().getTableUniqueName();
+    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
     TableDataMap dataMap = null;
     if (tableIndices != null) {
       dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
     }
     if (dataMap == null) {
-      synchronized (table.intern()) {
-        tableIndices = allDataMaps.get(table);
+      synchronized (tableUniqueName.intern()) {
+        tableIndices = allDataMaps.get(tableUniqueName);
         if (tableIndices != null) {
           dataMap = getTableDataMap(dataMapSchema.getDataMapName(), tableIndices);
         }
         if (dataMap == null) {
           try {
-            dataMap = createAndRegisterDataMap(identifier, dataMapSchema);
+            dataMap = createAndRegisterDataMap(table, dataMapSchema);
           } catch (Exception e) {
             throw new RuntimeException(e);
           }
@@ -231,7 +232,7 @@ public final class DataMapStoreManager {
    * The datamap is created using datamap name, datamap factory class and table identifier.
    */
   // TODO: make it private
-  public TableDataMap createAndRegisterDataMap(AbsoluteTableIdentifier identifier,
+  public TableDataMap createAndRegisterDataMap(CarbonTable table,
       DataMapSchema dataMapSchema) throws MalformedDataMapCommandException, IOException {
     DataMapFactory dataMapFactory;
     try {
@@ -246,34 +247,34 @@ public final class DataMapStoreManager {
       throw new MetadataProcessException(
           "failed to create DataMap '" + dataMapSchema.getProviderName() + "'", e);
     }
-    return registerDataMap(identifier, dataMapSchema, dataMapFactory);
+    return registerDataMap(table, dataMapSchema, dataMapFactory);
   }
 
-  public TableDataMap registerDataMap(AbsoluteTableIdentifier identifier,
+  public TableDataMap registerDataMap(CarbonTable table,
       DataMapSchema dataMapSchema,  DataMapFactory dataMapFactory)
       throws IOException, MalformedDataMapCommandException {
-    String table = identifier.getCarbonTableIdentifier().getTableUniqueName();
+    String tableUniqueName = table.getCarbonTableIdentifier().getTableUniqueName();
     // Just update the segmentRefreshMap with the table if not added.
-    getTableSegmentRefresher(identifier);
-    List<TableDataMap> tableIndices = allDataMaps.get(table);
+    getTableSegmentRefresher(table);
+    List<TableDataMap> tableIndices = allDataMaps.get(tableUniqueName);
     if (tableIndices == null) {
       tableIndices = new ArrayList<>();
     }
 
-    dataMapFactory.init(identifier, dataMapSchema);
+    dataMapFactory.init(table.getAbsoluteTableIdentifier(), dataMapSchema);
     BlockletDetailsFetcher blockletDetailsFetcher;
     SegmentPropertiesFetcher segmentPropertiesFetcher = null;
     if (dataMapFactory instanceof BlockletDetailsFetcher) {
       blockletDetailsFetcher = (BlockletDetailsFetcher) dataMapFactory;
     } else {
-      blockletDetailsFetcher = getBlockletDetailsFetcher(identifier);
+      blockletDetailsFetcher = getBlockletDetailsFetcher(table);
     }
     segmentPropertiesFetcher = (SegmentPropertiesFetcher) blockletDetailsFetcher;
-    TableDataMap dataMap = new TableDataMap(identifier, dataMapSchema, dataMapFactory,
-        blockletDetailsFetcher, segmentPropertiesFetcher);
+    TableDataMap dataMap = new TableDataMap(table.getAbsoluteTableIdentifier(),
+        dataMapSchema, dataMapFactory, blockletDetailsFetcher, segmentPropertiesFetcher);
 
     tableIndices.add(dataMap);
-    allDataMaps.put(table, tableIndices);
+    allDataMaps.put(tableUniqueName, tableIndices);
     return dataMap;
   }
 
@@ -294,7 +295,7 @@ public final class DataMapStoreManager {
    * @param segments
    */
   public void clearInvalidSegments(CarbonTable carbonTable, List<Segment> segments) {
-    getDefaultDataMap(carbonTable.getAbsoluteTableIdentifier()).clear(segments);
+    getDefaultDataMap(carbonTable).clear(segments);
     List<TableDataMap> allDataMap = getAllDataMap(carbonTable);
     for (TableDataMap dataMap: allDataMap) {
       dataMap.clear(segments);
@@ -347,11 +348,11 @@ public final class DataMapStoreManager {
   /**
    * Get the blocklet datamap factory to get the detail information of blocklets
    *
-   * @param identifier
+   * @param table
    * @return
    */
-  private BlockletDetailsFetcher getBlockletDetailsFetcher(AbsoluteTableIdentifier identifier) {
-    TableDataMap blockletMap = getDataMap(identifier, BlockletDataMapFactory.DATA_MAP_SCHEMA);
+  private BlockletDetailsFetcher getBlockletDetailsFetcher(CarbonTable table) {
+    TableDataMap blockletMap = getDataMap(table, BlockletDataMapFactory.DATA_MAP_SCHEMA);
     return (BlockletDetailsFetcher) blockletMap.getDataMapFactory();
   }
 
@@ -367,10 +368,10 @@ public final class DataMapStoreManager {
   /**
    * Get the TableSegmentRefresher for the table. If not existed then add one and return.
    */
-  public TableSegmentRefresher getTableSegmentRefresher(AbsoluteTableIdentifier identifier) {
-    String uniqueName = identifier.uniqueName();
+  public TableSegmentRefresher getTableSegmentRefresher(CarbonTable table) {
+    String uniqueName = table.getAbsoluteTableIdentifier().uniqueName();
     if (segmentRefreshMap.get(uniqueName) == null) {
-      segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(identifier));
+      segmentRefreshMap.put(uniqueName, new TableSegmentRefresher(table));
     }
     return segmentRefreshMap.get(uniqueName);
   }
@@ -388,8 +389,8 @@ public final class DataMapStoreManager {
     // altering.
     private Map<String, Boolean> manualSegmentRefresh = new HashMap<>();
 
-    public TableSegmentRefresher(AbsoluteTableIdentifier identifier) {
-      SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(identifier);
+    TableSegmentRefresher(CarbonTable table) {
+      SegmentUpdateStatusManager statusManager = new SegmentUpdateStatusManager(table);
       SegmentUpdateDetails[] updateStatusDetails = statusManager.getUpdateStatusDetails();
       for (SegmentUpdateDetails updateDetails : updateStatusDetails) {
         UpdateVO updateVO = statusManager.getInvalidTimestampRange(updateDetails.getSegmentName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
index e188bf1..b1729d1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/IndexDataMapProvider.java
@@ -47,8 +47,7 @@ public class IndexDataMapProvider implements DataMapProvider {
         new RelationIdentifier(mainTable.getDatabaseName(), mainTable.getTableName(),
             mainTable.getTableInfo().getFactTable().getTableId()));
     DataMapFactory dataMapFactory = createIndexDataMapFactory(dataMapSchema);
-    DataMapStoreManager.getInstance().registerDataMap(
-        mainTable.getAbsoluteTableIdentifier(), dataMapSchema, dataMapFactory);
+    DataMapStoreManager.getInstance().registerDataMap(mainTable, dataMapSchema, dataMapFactory);
     storageProvider.saveSchema(dataMapSchema);
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
index 8ed5c18..d9e544f 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java
@@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.exception.IndexBuilderException;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.UpdateVO;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
@@ -87,8 +88,9 @@ public class SegmentTaskIndexStore
     SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
     try {
       segmentTaskIndexWrapper =
-          loadAndGetTaskIdToSegmentsMap(tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
-              tableSegmentUniqueIdentifier.getAbsoluteTableIdentifier(),
+          loadAndGetTaskIdToSegmentsMap(
+              tableSegmentUniqueIdentifier.getSegmentToTableBlocksInfos(),
+              CarbonTable.buildFromTablePath("name", "path"),
               tableSegmentUniqueIdentifier);
     } catch (IndexBuilderException e) {
       throw new IOException(e.getMessage(), e);
@@ -163,21 +165,20 @@ public class SegmentTaskIndexStore
    * map
    *
    * @param segmentToTableBlocksInfos segment id to block info
-   * @param absoluteTableIdentifier   absolute table identifier
+   * @param table   table handle
    * @return map of taks id to segment mapping
    * @throws IOException
    */
   private SegmentTaskIndexWrapper loadAndGetTaskIdToSegmentsMap(
       Map<String, List<TableBlockInfo>> segmentToTableBlocksInfos,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
+      CarbonTable table,
       TableSegmentUniqueIdentifier tableSegmentUniqueIdentifier) throws IOException {
     // task id to segment map
     Iterator<Map.Entry<String, List<TableBlockInfo>>> iteratorOverSegmentBlocksInfos =
         segmentToTableBlocksInfos.entrySet().iterator();
     Map<TaskBucketHolder, AbstractIndex> taskIdToSegmentIndexMap = null;
     SegmentTaskIndexWrapper segmentTaskIndexWrapper = null;
-    SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
     String segmentId = null;
     TaskBucketHolder taskBucketHolder = null;
     try {
@@ -226,6 +227,7 @@ public class SegmentTaskIndexStore
               }
               Iterator<Map.Entry<TaskBucketHolder, List<TableBlockInfo>>> iterator =
                   taskIdToTableBlockInfoMap.entrySet().iterator();
+              AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
               long requiredSize =
                   calculateRequiredSize(taskIdToTableBlockInfoMap, absoluteTableIdentifier);
               segmentTaskIndexWrapper.setMemorySize(requiredSize);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 4adc977..257ee4c 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -20,6 +20,7 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
@@ -32,6 +33,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -42,6 +45,8 @@ import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.locks.CarbonLockUtil;
+import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
@@ -61,6 +66,9 @@ import org.apache.hadoop.fs.Path;
  */
 public class SegmentFileStore {
 
+  private static LogService LOGGER = LogServiceFactory.getLogService(
+      SegmentFileStore.class.getCanonicalName());
+
   private SegmentFile segmentFile;
 
   /**
@@ -104,7 +112,6 @@ public class SegmentFileStore {
           isRelative = true;
         }
         SegmentFile segmentFile = new SegmentFile();
-        Map<String, FolderDetails> locationMap = new HashMap<>();
         FolderDetails folderDetails = new FolderDetails();
         folderDetails.setRelative(isRelative);
         folderDetails.setPartitions(partionNames);
@@ -112,8 +119,7 @@ public class SegmentFileStore {
         for (CarbonFile file : carbonFiles) {
           folderDetails.getFiles().add(file.getName());
         }
-        locationMap.put(location, folderDetails);
-        segmentFile.setLocationMap(locationMap);
+        segmentFile.addPath(location, folderDetails);
         String path = writePath + "/" + taskNo + CarbonTablePath.SEGMENT_EXT;
         // write segment info to new file.
         writeSegmentFile(segmentFile, path);
@@ -122,6 +128,56 @@ public class SegmentFileStore {
   }
 
   /**
+   * Generate Segment file name
+   * @param segmentId segment id
+   * @param UUID unique string, typically caller can use the loading start
+   *             timestamp in CarbonLoadModel
+   * @return
+   */
+  public static String genSegmentFileName(String segmentId, String UUID) {
+    return segmentId + "_" + UUID;
+  }
+
+  /**
+   * Write segment file to the metadata folder of the table
+   * @param tablePath table path
+   * @param segmentId segment id
+   * @param UUID a UUID string used to construct the segment file name
+   * @return segment file name
+   */
+  public static String writeSegmentFile(String tablePath, String segmentId, String UUID)
+      throws IOException {
+    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+    CarbonFile segmentFolder = FileFactory.getCarbonFile(segmentPath);
+    CarbonFile[] indexFiles = segmentFolder.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT);
+      }
+    });
+    if (indexFiles != null && indexFiles.length > 0) {
+      SegmentFile segmentFile = new SegmentFile();
+      FolderDetails folderDetails = new FolderDetails();
+      folderDetails.setRelative(true);
+      folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
+      for (CarbonFile file : indexFiles) {
+        folderDetails.getFiles().add(file.getName());
+      }
+      String segmentRelativePath = segmentPath.substring(tablePath.length(), segmentPath.length());
+      segmentFile.addPath(segmentRelativePath, folderDetails);
+      String segmentFileFolder =  CarbonTablePath.getSegmentFilesLocation(tablePath);
+      CarbonFile carbonFile = FileFactory.getCarbonFile(segmentFileFolder);
+      if (!carbonFile.exists()) {
+        carbonFile.mkdirs(segmentFileFolder, FileFactory.getFileType(segmentFileFolder));
+      }
+      String segmentFileName = genSegmentFileName(segmentId, UUID) + CarbonTablePath.SEGMENT_EXT;
+      // write segment info to new file.
+      writeSegmentFile(segmentFile, segmentFileFolder + File.separator + segmentFileName);
+      return segmentFileName;
+    }
+    return null;
+  }
+
+  /**
    * Writes the segment file in json format
    * @param segmentFile
    * @param path
@@ -177,6 +233,60 @@ public class SegmentFileStore {
     return null;
   }
 
+  /**
+   * This API will update the segmentFile of a passed segment.
+   *
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
+      throws IOException {
+    boolean status = false;
+    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from(tablePath, null, null);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+    try {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metadataPath);
+
+        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+          // if the segments is in the list of marked for delete then update the status.
+          if (segmentId.equals(detail.getLoadName())) {
+            detail.setSegmentFile(segmentFile);
+            break;
+          }
+        }
+
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+        status = true;
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table path " + tablePath);
+      }
+      ;
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + tablePath + " during table status updation");
+      }
+    }
+    return status;
+  }
+
   private static CarbonFile[] getSegmentFiles(String segmentPath) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     if (carbonFile.exists()) {
@@ -213,7 +323,6 @@ public class SegmentFileStore {
           isRelative = true;
         }
         SegmentFile localSegmentFile = new SegmentFile();
-        Map<String, FolderDetails> locationMap = new HashMap<>();
         FolderDetails folderDetails = new FolderDetails();
         folderDetails.setRelative(isRelative);
         folderDetails.setPartitions(spec.getPartitions());
@@ -228,8 +337,7 @@ public class SegmentFileStore {
             folderDetails.getFiles().add(file.getName());
           }
         }
-        locationMap.put(location, folderDetails);
-        localSegmentFile.setLocationMap(locationMap);
+        localSegmentFile.addPath(location, folderDetails);
         if (segmentFile == null) {
           segmentFile = localSegmentFile;
         } else {
@@ -440,7 +548,8 @@ public class SegmentFileStore {
     if (updateSegment) {
       String writePath = CarbonTablePath.getSegmentFilesLocation(tablePath);
       writePath =
-          writePath + CarbonCommonConstants.FILE_SEPARATOR + segment.getSegmentNo() + "_" + uniqueId
+          writePath + CarbonCommonConstants.FILE_SEPARATOR +
+              SegmentFileStore.genSegmentFileName(segment.getSegmentNo(),  String.valueOf(uniqueId))
               + CarbonTablePath.SEGMENT_EXT;
       writeSegmentFile(segmentFile, writePath);
     }
@@ -564,9 +673,17 @@ public class SegmentFileStore {
       Map<String, List<String>> locationMap) {
     for (Map.Entry<String, List<String>> entry : locationMap.entrySet()) {
       Path location = new Path(entry.getKey()).getParent();
-      boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
-      if (!exists) {
-        FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+      if (partitionSpecs != null) {
+        boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
+        if (!exists) {
+          FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location.toString()));
+        }
+      } else {
+        // delete the segment folder if it is empty
+        CarbonFile file = FileFactory.getCarbonFile(location.toString());
+        if (file.listFiles().length == 0) {
+          file.delete();
+        }
       }
     }
   }
@@ -698,9 +815,16 @@ public class SegmentFileStore {
 
     private static final long serialVersionUID = 3582245668420401089L;
 
+    /**
+     * mapping of index file parent folder to the index file folder info
+     */
     private Map<String, FolderDetails> locationMap;
 
-    public SegmentFile merge(SegmentFile mapper) {
+    SegmentFile() {
+      locationMap = new HashMap<>();
+    }
+
+    SegmentFile merge(SegmentFile mapper) {
       if (this == mapper) {
         return this;
       }
@@ -724,9 +848,13 @@ public class SegmentFileStore {
       return locationMap;
     }
 
-    public void setLocationMap(Map<String, FolderDetails> locationMap) {
-      this.locationMap = locationMap;
+    /**
+     * Add index file parent folder and the index file folder info
+     */
+    void addPath(String path, FolderDetails details) {
+      locationMap.put(path, details);
     }
+
   }
 
   /**
@@ -736,14 +864,32 @@ public class SegmentFileStore {
 
     private static final long serialVersionUID = 501021868886928553L;
 
+    /**
+     * Based on isRelative variable:
+     * 1. if it is relative, it is relative path to the table path, for all index files
+     * 2. if it is not relative, it is the full path of all index files
+     */
     private Set<String> files = new HashSet<>();
 
+    /**
+     * all partition names
+     */
     private List<String> partitions = new ArrayList<>();
 
+    /**
+     * status for the partition, success or mark for delete
+     */
     private String status;
 
+    /**
+     * file name for merge index file in this folder
+     */
     private String mergeFileName;
 
+    /**
+     * true if it is relative path, for example, if user give partition location when
+     * adding the partition, it will be false
+     */
     private boolean isRelative;
 
     public FolderDetails merge(FolderDetails folderDetails) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index 4ff19cb..2b4dabe 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.data.BlockMappingVO;
 import org.apache.carbondata.core.mutate.data.RowCountDetailsVO;
@@ -117,8 +119,7 @@ public class CarbonUpdateUtil {
   public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
       CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
     boolean status = false;
-    SegmentUpdateStatusManager segmentUpdateStatusManager =
-            new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+    SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table);
     ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
     boolean lockStatus = false;
 
@@ -419,7 +420,7 @@ public class CarbonUpdateUtil {
    * @param table clean up will be handled on this table.
    * @param forceDelete if true then max query execution timeout will not be considered.
    */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) {
+  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) throws IOException {
 
     SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
 
@@ -432,6 +433,8 @@ public class CarbonUpdateUtil {
 
     boolean isInvalidFile = false;
 
+    List<Segment> segmentFilesToBeUpdated = new ArrayList<>();
+
     // scan through each segment.
 
     for (LoadMetadataDetails segment : details) {
@@ -453,11 +456,13 @@ public class CarbonUpdateUtil {
         CarbonFile[] allSegmentFiles = segDir.listFiles();
 
         // scan through the segment and find the carbondatafiles and index files.
-        SegmentUpdateStatusManager updateStatusManager =
-                new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier());
+        SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(table);
 
+        boolean updateSegmentFile = false;
         // deleting of the aborted file scenario.
-        deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager);
+        if (deleteStaleCarbonDataFiles(segment, allSegmentFiles, updateStatusManager)) {
+          updateSegmentFile = true;
+        }
 
         // get Invalid update  delta files.
         CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
@@ -467,12 +472,9 @@ public class CarbonUpdateUtil {
 
         // now for each invalid delta file need to check the query execution time out
         // and then delete.
-
         for (CarbonFile invalidFile : invalidUpdateDeltaFiles) {
-
           compareTimestampsAndDelete(invalidFile, forceDelete, false);
         }
-
         // do the same for the index files.
         CarbonFile[] invalidIndexFiles = updateStatusManager
             .getUpdateDeltaFilesList(segment.getLoadName(), false,
@@ -483,10 +485,10 @@ public class CarbonUpdateUtil {
         // and then delete.
 
         for (CarbonFile invalidFile : invalidIndexFiles) {
-
-          compareTimestampsAndDelete(invalidFile, forceDelete, false);
+          if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+            updateSegmentFile = true;
+          }
         }
-
         // now handle all the delete delta files which needs to be deleted.
         // there are 2 cases here .
         // 1. if the block is marked as compacted then the corresponding delta files
@@ -531,7 +533,11 @@ public class CarbonUpdateUtil {
 
             for (CarbonFile invalidFile : blockRelatedFiles) {
 
-              compareTimestampsAndDelete(invalidFile, forceDelete, false);
+              if (compareTimestampsAndDelete(invalidFile, forceDelete, false)) {
+                if (invalidFile.getName().endsWith(CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)) {
+                  updateSegmentFile = true;
+                }
+              }
             }
 
 
@@ -545,8 +551,27 @@ public class CarbonUpdateUtil {
             }
           }
         }
+        if (updateSegmentFile) {
+          segmentFilesToBeUpdated.add(Segment.toSegment(segment.getLoadName()));
+        }
       }
     }
+    String UUID = String.valueOf(System.currentTimeMillis());
+    List<Segment> segmentFilesToBeUpdatedLatest = new ArrayList<>();
+    for (Segment segment : segmentFilesToBeUpdated) {
+      String file =
+          SegmentFileStore.writeSegmentFile(table.getTablePath(), segment.getSegmentNo(), UUID);
+      segmentFilesToBeUpdatedLatest.add(new Segment(segment.getSegmentNo(), file));
+    }
+    if (segmentFilesToBeUpdated.size() > 0) {
+      updateTableMetadataStatus(
+          new HashSet<Segment>(segmentFilesToBeUpdated),
+          table,
+          UUID,
+          false,
+          new ArrayList<Segment>(),
+          segmentFilesToBeUpdatedLatest);
+    }
 
     // delete the update table status files which are old.
     if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) {
@@ -589,7 +614,7 @@ public class CarbonUpdateUtil {
    * @param allSegmentFiles
    * @param updateStatusManager
    */
-  private static void deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
+  private static boolean deleteStaleCarbonDataFiles(LoadMetadataDetails segment,
       CarbonFile[] allSegmentFiles, SegmentUpdateStatusManager updateStatusManager) {
     CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager
         .getUpdateDeltaFilesList(segment.getLoadName(), false,
@@ -607,9 +632,13 @@ public class CarbonUpdateUtil {
             true);
     // now for each invalid index file need to check the query execution time out
     // and then delete.
+    boolean updateSegmentFile = false;
     for (CarbonFile invalidFile : invalidIndexFiles) {
-      compareTimestampsAndDelete(invalidFile, true, false);
+      if (compareTimestampsAndDelete(invalidFile, true, false)) {
+        updateSegmentFile = true;
+      }
     }
+    return updateSegmentFile;
   }
 
   /**
@@ -642,8 +671,9 @@ public class CarbonUpdateUtil {
    * @param forceDelete
    * @param isUpdateStatusFile if true then the parsing of file name logic changes.
    */
-  private static void compareTimestampsAndDelete(CarbonFile invalidFile,
-                                                 boolean forceDelete, boolean isUpdateStatusFile) {
+  private static boolean compareTimestampsAndDelete(
+      CarbonFile invalidFile,
+      boolean forceDelete, boolean isUpdateStatusFile) {
     long fileTimestamp = 0L;
 
     if (isUpdateStatusFile) {
@@ -661,12 +691,14 @@ public class CarbonUpdateUtil {
       try {
         LOGGER.info("deleting the invalid file : " + invalidFile.getName());
         CarbonUtil.deleteFoldersAndFiles(invalidFile);
+        return true;
       } catch (IOException e) {
         LOGGER.error("error in clean up of compacted files." + e.getMessage());
       } catch (InterruptedException e) {
         LOGGER.error("error in clean up of compacted files." + e.getMessage());
       }
     }
+    return false;
   }
 
   public static boolean isBlockInvalid(SegmentStatus blockStatus) {
@@ -719,9 +751,9 @@ public class CarbonUpdateUtil {
    */
   public static long getRowCount(
       BlockMappingVO blockMappingVO,
-      AbsoluteTableIdentifier absoluteTableIdentifier) {
+      CarbonTable carbonTable) {
     SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(carbonTable);
     long rowCount = 0;
     Map<String, Long> blockRowCountMap = blockMappingVO.getBlockRowCountMapping();
     for (Map.Entry<String, Long> blockRowEntry : blockRowCountMap.entrySet()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index a21873d..308fe30 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
 import org.apache.carbondata.core.mutate.TupleIdEnum;
@@ -70,31 +71,24 @@ public class SegmentUpdateStatusManager {
   private Map<String, SegmentUpdateDetails> blockAndDetailsMap;
   private boolean isPartitionTable;
 
-  public SegmentUpdateStatusManager(AbsoluteTableIdentifier absoluteTableIdentifier,
+  public SegmentUpdateStatusManager(CarbonTable table,
       LoadMetadataDetails[] segmentDetails) {
-    this.identifier = absoluteTableIdentifier;
+    this.identifier = table.getAbsoluteTableIdentifier();
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
     this.segmentDetails = segmentDetails;
-    if (segmentDetails.length > 0) {
-      isPartitionTable = segmentDetails[0].getSegmentFile() != null;
-    }
+    isPartitionTable = table.isHivePartitionTable();
     updateDetails = readLoadMetadata();
     populateMap();
   }
 
-  /**
-   * @param identifier
-   */
-  public SegmentUpdateStatusManager(AbsoluteTableIdentifier identifier) {
-    this.identifier = identifier;
+  public SegmentUpdateStatusManager(CarbonTable table) {
+    this.identifier = table.getAbsoluteTableIdentifier();
     // current it is used only for read function scenarios, as file update always requires to work
     // on latest file status.
     segmentDetails = SegmentStatusManager.readLoadMetadata(
         CarbonTablePath.getMetadataPath(identifier.getTablePath()));
-    if (segmentDetails.length > 0) {
-      isPartitionTable = segmentDetails[0].getSegmentFile() != null;
-    }
+    isPartitionTable = table.isHivePartitionTable();
     updateDetails = readLoadMetadata();
     populateMap();
   }
@@ -261,36 +255,30 @@ public class SegmentUpdateStatusManager {
    * Returns all delta file paths of specified block
    */
   private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
-    try {
-      String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
-      String completeBlockName = CarbonTablePath.addDataPartPrefix(
-          CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
-              + CarbonCommonConstants.FACT_FILE_EXT);
-      String blockPath;
-      if (isPartitionTable) {
-        blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
-            + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
-            .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
-      } else {
-        String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
-            identifier.getTablePath(), segment);
-        blockPath =
-            carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
-      }
-      CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
-      if (!file.exists()) {
-        throw new Exception("Invalid tuple id " + tupleId);
-      }
-      String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
-      //blockName without timestamp
-      final String blockNameFromTuple =
-          blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
-      return getDeltaFiles(file, blockNameFromTuple, extension, segment);
-    } catch (Exception ex) {
-      String errorMsg = "Invalid tuple id " + tupleId;
-      LOG.error(errorMsg);
-      throw new Exception(errorMsg);
+    String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
+    String completeBlockName = CarbonTablePath.addDataPartPrefix(
+        CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
+            + CarbonCommonConstants.FACT_FILE_EXT);
+    String blockPath;
+    if (isPartitionTable) {
+      blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
+          .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+    } else {
+      String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(
+          identifier.getTablePath(), segment);
+      blockPath =
+          carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
+    }
+    CarbonFile file = FileFactory.getCarbonFile(blockPath, FileFactory.getFileType(blockPath));
+    if (!file.exists()) {
+      throw new Exception("Invalid tuple id " + tupleId);
     }
+    String blockNameWithoutExtn = completeBlockName.substring(0, completeBlockName.indexOf('.'));
+    //blockName without timestamp
+    final String blockNameFromTuple =
+        blockNameWithoutExtn.substring(0, blockNameWithoutExtn.lastIndexOf("-"));
+    return getDeltaFiles(file, blockNameFromTuple, extension, segment);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index bc150e5..bcecdef 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -30,6 +30,7 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.MergedBlockIndex;
@@ -40,10 +41,19 @@ import org.apache.hadoop.fs.Path;
 public class CarbonIndexFileMergeWriter {
 
   /**
+   * table handle
+   */
+  private CarbonTable table;
+
+  /**
    * thrift writer object
    */
   private ThriftWriter thriftWriter;
 
+  public CarbonIndexFileMergeWriter(CarbonTable table) {
+    this.table = table;
+  }
+
   /**
    * Merge all the carbonindex files of segment to a  merged file
    * @param tablePath
@@ -54,7 +64,7 @@ public class CarbonIndexFileMergeWriter {
    *                                         which do not store the blocklet info to current version
    * @throws IOException
    */
-  private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+  private String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath, List<String> indexFileNamesTobeAdded,
       boolean readFileFooterFromCarbonDataFile) throws IOException {
     Segment segment = Segment.getSegment(segmentId, tablePath);
@@ -70,17 +80,18 @@ public class CarbonIndexFileMergeWriter {
     }
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
       if (sfs == null) {
-        return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile,
-            segmentPath, indexFiles);
+        return writeMergeIndexFileBasedOnSegmentFolder(
+            indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile, segmentPath, indexFiles);
       } else {
-        return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
+        return writeMergeIndexFileBasedOnSegmentFile(
+            segmentId, indexFileNamesTobeAdded, sfs, indexFiles);
       }
     }
     return null;
   }
 
 
-  private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded,
+  private String writeMergeIndexFileBasedOnSegmentFolder(List<String> indexFileNamesTobeAdded,
       boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles)
       throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
@@ -100,7 +111,9 @@ public class CarbonIndexFileMergeWriter {
     return null;
   }
 
-  private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded,
+  private String writeMergeIndexFileBasedOnSegmentFile(
+      String segmentId,
+      List<String> indexFileNamesTobeAdded,
       SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
     SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
     fileStore
@@ -133,11 +146,20 @@ public class CarbonIndexFileMergeWriter {
       }
     }
 
-    List<String> filesTobeDeleted = new ArrayList<>();
+    String uniqueId = String.valueOf(System.currentTimeMillis());
+    String newSegmentFileName =
+        SegmentFileStore.genSegmentFileName(segmentId, String.valueOf(uniqueId))
+            + CarbonTablePath.SEGMENT_EXT;
+    String path = CarbonTablePath.getSegmentFilesLocation(table.getTablePath())
+        + CarbonCommonConstants.FILE_SEPARATOR + newSegmentFileName;
+    SegmentFileStore.writeSegmentFile(sfs.getSegmentFile(), path);
+    SegmentFileStore.updateSegmentFile(table.getTablePath(), segmentId, newSegmentFileName);
+
     for (CarbonFile file : indexFiles) {
-      filesTobeDeleted.add(file.getAbsolutePath());
+      file.delete();
     }
-    return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted);
+
+    return uniqueId;
   }
 
   private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
@@ -173,7 +195,7 @@ public class CarbonIndexFileMergeWriter {
    * @param indexFileNamesTobeAdded
    * @throws IOException
    */
-  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+  public String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath, List<String> indexFileNamesTobeAdded) throws IOException {
     return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false);
   }
@@ -184,7 +206,7 @@ public class CarbonIndexFileMergeWriter {
    * @param segmentId
    * @throws IOException
    */
-  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+  public String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath) throws IOException {
     return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false);
   }
@@ -196,7 +218,7 @@ public class CarbonIndexFileMergeWriter {
    * @param readFileFooterFromCarbonDataFile
    * @throws IOException
    */
-  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+  public String mergeCarbonIndexFilesOfSegment(String segmentId,
       String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException {
     return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
         readFileFooterFromCarbonDataFile);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index c057129..c352a95 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -163,16 +163,13 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
     UpdateVO invalidBlockVOForSegmentId = null;
     Boolean isIUDTable = false;
 
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
-    SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+    SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(carbonTable);
 
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+        getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     for (CarbonInputSplit inputSplit : dataBlocksOfSegment) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index 8d2318b..c757ba9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -319,7 +319,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
    * get data blocks of given segment
    */
   protected List<CarbonInputSplit> getDataBlocksOfSegment(JobContext job,
-      AbsoluteTableIdentifier absoluteTableIdentifier, FilterResolverIntf resolver,
+      CarbonTable carbonTable, FilterResolverIntf resolver,
       BitSet matchedPartitions, List<Segment> segmentIds, PartitionInfo partitionInfo,
       List<Integer> oldPartitionIdList) throws IOException {
 
@@ -328,7 +328,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
 
     // get tokens for all the required FileSystem for table path
     TokenCache.obtainTokensForNamenodes(job.getCredentials(),
-        new Path[] { new Path(absoluteTableIdentifier.getTablePath()) }, job.getConfiguration());
+        new Path[] { new Path(carbonTable.getTablePath()) }, job.getConfiguration());
     boolean distributedCG = Boolean.parseBoolean(CarbonProperties.getInstance()
         .getProperty(CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP,
             CarbonCommonConstants.USE_DISTRIBUTED_DATAMAP_DEFAULT));
@@ -339,7 +339,7 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     List<ExtendedBlocklet> prunedBlocklets;
     if (distributedCG || dataMapExprWrapper.getDataMapType() == DataMapLevel.FG) {
       DistributableDataMapFormat datamapDstr =
-          new DistributableDataMapFormat(absoluteTableIdentifier, dataMapExprWrapper, segmentIds,
+          new DistributableDataMapFormat(carbonTable, dataMapExprWrapper, segmentIds,
               partitionsToPrune, BlockletDataMapFactory.class.getName());
       prunedBlocklets = dataMapJob.execute(datamapDstr, resolver);
       // Apply expression on the blocklets.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 4634b06..f573acf 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -117,7 +117,8 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
         + CarbonCommonConstants.FILE_SEPARATOR
         + loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp() + ".tmp";
     // Merge all partition files into a single file.
-    String segmentFileName = loadModel.getSegmentId() + "_" + loadModel.getFactTimeStamp();
+    String segmentFileName = SegmentFileStore.genSegmentFileName(
+        loadModel.getSegmentId(), String.valueOf(loadModel.getFactTimeStamp()));
     SegmentFileStore.SegmentFile segmentFile = SegmentFileStore
         .mergeSegmentFiles(readPath, segmentFileName,
             CarbonTablePath.getSegmentFilesLocation(loadModel.getTablePath()));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index c94f777..beb51cb 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -130,12 +130,12 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
     LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
         .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
-    SegmentUpdateStatusManager updateStatusManager =
-        new SegmentUpdateStatusManager(identifier, loadMetadataDetails);
     CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
     if (null == carbonTable) {
       throw new IOException("Missing/Corrupt schema file for table.");
     }
+    SegmentUpdateStatusManager updateStatusManager =
+        new SegmentUpdateStatusManager(carbonTable, loadMetadataDetails);
     List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
     List<Segment> streamSegments = null;
@@ -182,7 +182,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     for (SegmentUpdateDetails segmentUpdateDetail : updateStatusManager
         .getUpdateStatusDetails()) {
       boolean refreshNeeded =
-          DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+          DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
               .isRefreshNeeded(segmentUpdateDetail.getSegmentName(), updateStatusManager);
       if (refreshNeeded) {
         toBeCleanedSegments.add(new Segment(segmentUpdateDetail.getSegmentName(), null));
@@ -190,7 +190,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
     // Clean segments if refresh is needed
     for (Segment segment : filteredSegmentToAccess) {
-      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      if (DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
           .isRefreshNeeded(segment.getSegmentNo())) {
         toBeCleanedSegments.add(segment);
       }
@@ -370,12 +370,9 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * @param targetSegment
    * @param oldPartitionIdList  get old partitionId before partitionInfo was changed
    * @return
-   * @throws IOException
    */
   public List<InputSplit> getSplitsOfOneSegment(JobContext job, String targetSegment,
-      List<Integer> oldPartitionIdList, PartitionInfo partitionInfo)
-      throws IOException {
-    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(job.getConfiguration());
+      List<Integer> oldPartitionIdList, PartitionInfo partitionInfo) {
     List<Segment> invalidSegments = new ArrayList<>();
     List<UpdateVO> invalidTimestampsList = new ArrayList<>();
 
@@ -383,7 +380,6 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     segmentList.add(new Segment(targetSegment, null));
     setSegmentsToAccess(job.getConfiguration(), segmentList);
     try {
-
       // process and resolve the expression
       Expression filter = getFilterPredicates(job.getConfiguration());
       CarbonTable carbonTable = getOrCreateCarbonTable(job.getConfiguration());
@@ -414,7 +410,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       FilterResolverIntf filterInterface = carbonTable.resolveFilter(filter, tableProvider);
       // do block filtering and get split
       List<InputSplit> splits = getSplits(job, filterInterface, segmentList, matchedPartitions,
-          partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(identifier));
+          partitionInfo, oldPartitionIdList, new SegmentUpdateStatusManager(carbonTable));
       // pass the invalid segment to task side in order to remove index entry in task side
       if (invalidSegments.size() > 0) {
         for (InputSplit split : splits) {
@@ -473,14 +469,11 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     UpdateVO invalidBlockVOForSegmentId = null;
     Boolean isIUDTable = false;
 
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        getOrCreateCarbonTable(job.getConfiguration()).getAbsoluteTableIdentifier();
-
     isIUDTable = (updateStatusManager.getUpdateStatusDetails().length != 0);
 
     // for each segment fetch blocks matching filter in Driver BTree
     List<org.apache.carbondata.hadoop.CarbonInputSplit> dataBlocksOfSegment =
-        getDataBlocksOfSegment(job, absoluteTableIdentifier, filterResolver, matchedPartitions,
+        getDataBlocksOfSegment(job, carbonTable, filterResolver, matchedPartitions,
             validSegments, partitionInfo, oldPartitionIdList);
     numBlocks = dataBlocksOfSegment.size();
     for (org.apache.carbondata.hadoop.CarbonInputSplit inputSplit : dataBlocksOfSegment) {
@@ -527,18 +520,15 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
 
   /**
    * Get the row count of the Block and mapping of segment and Block count.
-   *
-   * @param identifier
-   * @return
-   * @throws IOException
    */
-  public BlockMappingVO getBlockRowCount(Job job, AbsoluteTableIdentifier identifier,
+  public BlockMappingVO getBlockRowCount(Job job, CarbonTable table,
       List<PartitionSpec> partitions) throws IOException {
-    TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(identifier);
+    AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier();
+    TableDataMap blockletMap = DataMapStoreManager.getInstance().getDefaultDataMap(table);
     LoadMetadataDetails[] loadMetadataDetails = SegmentStatusManager
         .readTableStatusFile(CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()));
     SegmentUpdateStatusManager updateStatusManager = new SegmentUpdateStatusManager(
-        identifier, loadMetadataDetails);
+        table, loadMetadataDetails);
     SegmentStatusManager.ValidAndInvalidSegmentsInfo allSegments =
         new SegmentStatusManager(identifier).getValidAndInvalidSegments(loadMetadataDetails);
     Map<String, Long> blockRowCountMapping = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
index 60c88dc..3f19a3f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/DistributableDataMapFormat.java
@@ -29,7 +29,7 @@ import org.apache.carbondata.core.datamap.dev.expr.DataMapDistributableWrapper;
 import org.apache.carbondata.core.datamap.dev.expr.DataMapExprWrapper;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
 import org.apache.carbondata.core.indexstore.PartitionSpec;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil;
 
@@ -48,7 +48,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private static final String FILTER_EXP = "mapreduce.input.distributed.datamap.filter";
 
-  private AbsoluteTableIdentifier identifier;
+  private CarbonTable table;
 
   private DataMapExprWrapper dataMapExprWrapper;
 
@@ -58,10 +58,10 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
 
   private List<PartitionSpec> partitions;
 
-  DistributableDataMapFormat(AbsoluteTableIdentifier identifier,
+  DistributableDataMapFormat(CarbonTable table,
       DataMapExprWrapper dataMapExprWrapper, List<Segment> validSegments,
       List<PartitionSpec> partitions, String className) {
-    this.identifier = identifier;
+    this.table = table;
     this.dataMapExprWrapper = dataMapExprWrapper;
     this.validSegments = validSegments;
     this.className = className;
@@ -106,7 +106,7 @@ public class DistributableDataMapFormat extends FileInputFormat<Void, ExtendedBl
           throws IOException, InterruptedException {
         DataMapDistributableWrapper distributable = (DataMapDistributableWrapper) inputSplit;
         TableDataMap dataMap = DataMapStoreManager.getInstance()
-            .getDataMap(identifier, distributable.getDistributable().getDataMapSchema());
+            .getDataMap(table, distributable.getDistributable().getDataMapSchema());
         List<ExtendedBlocklet> blocklets = dataMap.prune(
             distributable.getDistributable(),
             dataMapExprWrapper.getFilterResolverIntf(distributable.getUniqueId()), partitions);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index ff421e1..fd97996 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -52,7 +52,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"""LOAD DATA INPATH '$resourcesPath/Data/VmaLL100' INTO TABLE carbon_automation_merge OPTIONS('DELIMITER'=',','QUOTECHAR'='"','FILEHEADER'='imei,deviceInformationId,MAC,deviceColor,device_backColor,modelId,marketName,AMSize,ROMSize,CUPAudit,CPIClocked,series,productionDate,bomCode,internalModels,deliveryTime,channelsId,channelsName,deliveryAreaId,deliveryCountry,deliveryProvince,deliveryCity,deliveryDistrict,deliveryStreet,oxSingleNumber,contractNumber,ActiveCheckTime,ActiveAreaId,ActiveCountry,ActiveProvince,Activecity,ActiveDistrict,ActiveStreet,ActiveOperatorId,Active_releaseId,Active_EMUIVersion,Active_operaSysVersion,Active_BacVerNumber,Active_BacFlashVer,Active_webUIVersion,Active_webUITypeCarrVer,Active_webTypeDataVerNumber,Active_operatorsVersion,Active_phonePADPartitionedVersions,Latest_YEAR,Latest_MONTH,Latest_DAY,Latest_HOUR,Latest_areaId,Latest_country,Latest_province,Latest_city,Latest_district,Latest_street,Latest_releaseId,Latest_EMUIVersion,Latest_operaSysVe
 rsion,Latest_BacVerNumber,Latest_BacFlashVer,Latest_webUIVersion,Latest_webUITypeCarrVer,Latest_webTypeDataVerNumber,Latest_operatorsVersion,Latest_phonePADPartitionedVersions,Latest_operatorId,gamePointId,gamePointDescription')""").collect
 
     val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_merge")
-    new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
+    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_merge", "0") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""),
       sql("""Select count(*) from carbon_automation_merge"""))
@@ -69,8 +69,8 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 2)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
-    new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
+    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
+    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)
@@ -91,7 +91,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "2") == 2)
     sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","carbon_automation_nonmerge")
-    new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
+    new CarbonIndexFileMergeWriter(table).mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 0)
     assert(getMergedIndexFileCount("default", "carbon_automation_nonmerge", "0.1") == 1)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index aace3ea..c7912cf 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,7 +62,7 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
@@ -85,9 +85,9 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
@@ -110,9 +110,9 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
@@ -139,7 +139,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -168,7 +168,7 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    new CarbonIndexFileMergeWriter()
+    new CarbonIndexFileMergeWriter(table)
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 510903a..1fbddb0 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -16,11 +16,15 @@
  */
 package org.apache.carbondata.spark.testsuite.iud
 
+import java.io.File
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{CarbonEnv, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 
 class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -178,6 +182,8 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("clean files for table select_after_clean")
     sql("delete from select_after_clean where name='def'")
     sql("clean files for table select_after_clean")
+    assertResult(false)(new File(
+      CarbonTablePath.getSegmentPath(s"$storeLocation/iud_db.db/select_after_clean", "0")).exists())
     checkAnswer(sql("""select * from select_after_clean"""),
       Seq(Row(1, "abc"), Row(3, "uhj"), Row(4, "frg")))
   }
@@ -198,7 +204,9 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     val files = FileFactory.getCarbonFile(metaPath)
     val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass
     if(result.getCanonicalName.contains("CarbonFileMetastore")) {
-      assert(files.listFiles().length == 2)
+      assert(files.listFiles(new CarbonFileFilter {
+        override def accept(file: CarbonFile): Boolean = !file.isDirectory
+      }).length == 2)
     }
     else
       assert(files.listFiles().length == 1)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 98c9a16..ec39f66 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -398,12 +398,12 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""drop table if exists iud.show_segment""").show
     sql("""create table iud.show_segment (c1 string,c2 int,c3 string,c5 string) STORED BY 'org.apache.carbondata.format'""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.show_segment""")
-    val before_update = sql("""show segments for table iud.show_segment""").toDF()
+    val before_update = sql("""show segments for table iud.show_segment""").collect()
     sql("""update iud.show_segment d set (d.c3, d.c5 ) = (select s.c33,s.c55 from iud.source2 s where d.c1 = s.c11) where 1 = 1""").show()
-    val after_update = sql("""show segments for table iud.show_segment""").toDF()
+    val after_update = sql("""show segments for table iud.show_segment""")
     checkAnswer(
-      before_update,
-      after_update
+      after_update,
+      before_update
     )
     sql("""drop table if exists iud.show_segment""").show
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index c9237d1..b4c5e4d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -274,8 +274,9 @@ class CarbonMergerRDD[K, V](
     val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
       tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      absoluteTableIdentifier)
+      carbonTable)
     val jobConf: JobConf = new JobConf(new Configuration)
     SparkHadoopUtil.get.addCredentials(jobConf)
     val job: Job = new Job(jobConf)
@@ -383,7 +384,6 @@ class CarbonMergerRDD[K, V](
           dataFileFooter.getSegmentInfo.getColumnCardinality)
     }
     val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     // update cardinality and column schema list according to master schema
     val cardinality = CarbonCompactionUtil
       .updateColumnSchemaAndGetCardinality(columnToCardinalityMap,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 48ebdb4..37cd12e 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -28,11 +28,13 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{SegmentProperties, TableBlockInfo}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.CarbonInputSplit
@@ -203,5 +205,22 @@ object PartitionUtils {
       files.add(file)
     }
     CarbonUtil.deleteFiles(files.asScala.toArray)
+    if (!files.isEmpty) {
+      val carbonTable = alterPartitionModel.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+      val file = SegmentFileStore.writeSegmentFile(
+        identifier.getTablePath,
+        alterPartitionModel.segmentId,
+        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString)
+      val segmentFiles = Seq(new Segment(alterPartitionModel.segmentId, file)).asJava
+      if (!CarbonUpdateUtil.updateTableMetadataStatus(
+        new util.HashSet[Segment](Seq(new Segment(alterPartitionModel.segmentId, null)).asJava),
+        carbonTable,
+        alterPartitionModel.carbonLoadModel.getFactTimeStamp.toString,
+        true,
+        new util.ArrayList[Segment](0),
+        new util.ArrayList[Segment](segmentFiles))) {
+        throw new IOException("Data update failed due to failure in table status updation.")
+      }
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 0d00023..bbe7be0 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -51,7 +51,7 @@ import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion, SegmentFileStore}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
@@ -291,7 +291,6 @@ object CarbonDataRDDFactory {
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None,
       operationContext: OperationContext): Unit = {
-    val storePath: String = carbonLoadModel.getTablePath
     LOGGER.audit(s"Data load request has been received for table" +
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     // Check if any load need to be deleted before loading new data
@@ -428,6 +427,13 @@ object CarbonDataRDDFactory {
             segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
           }
         }
+        val segmentFiles = segmentDetails.asScala.map{seg =>
+          val file = SegmentFileStore.writeSegmentFile(
+            carbonTable.getTablePath,
+            seg.getSegmentNo,
+            updateModel.get.updatedTimeStamp.toString)
+          new Segment(seg.getSegmentNo, file)
+        }.filter(_.getSegmentFileName != null).asJava
 
         // this means that the update doesnt have any records to update so no need to do table
         // status file updation.
@@ -441,7 +447,8 @@ object CarbonDataRDDFactory {
           carbonTable,
           updateModel.get.updatedTimeStamp + "",
           true,
-          new util.ArrayList[Segment](0))) {
+          new util.ArrayList[Segment](0),
+          new util.ArrayList[Segment](segmentFiles))) {
           LOGGER.audit("Data update is successful for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         } else {
@@ -499,6 +506,11 @@ object CarbonDataRDDFactory {
       }
 
       writeDictionary(carbonLoadModel, result, writeAll = false)
+
+      val segmentFileName =
+        SegmentFileStore.writeSegmentFile(carbonTable.getTablePath, carbonLoadModel.getSegmentId,
+          String.valueOf(carbonLoadModel.getFactTimeStamp))
+
       operationContext.setProperty(carbonTable.getTableUniqueName + "_Segment",
         carbonLoadModel.getSegmentId)
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
@@ -513,6 +525,7 @@ object CarbonDataRDDFactory {
           loadStatus,
           newEntryLoadStatus,
           overwriteTable,
+          segmentFileName,
           uniqueTableStatusId)
       val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
         new LoadTablePostStatusUpdateEvent(carbonLoadModel)
@@ -788,6 +801,7 @@ object CarbonDataRDDFactory {
       loadStatus: SegmentStatus,
       newEntryLoadStatus: SegmentStatus,
       overwriteTable: Boolean,
+      segmentFileName: String,
       uuid: String = ""): Boolean = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val metadataDetails = if (status != null && status.size > 0 && status(0) != null) {
@@ -795,11 +809,12 @@ object CarbonDataRDDFactory {
     } else {
       new LoadMetadataDetails
     }
+    metadataDetails.setSegmentFile(segmentFileName)
     CarbonLoaderUtil.populateNewLoadMetaEntry(
-      metadataDetails,
-      newEntryLoadStatus,
-      carbonLoadModel.getFactTimeStamp,
-      true)
+        metadataDetails,
+        newEntryLoadStatus,
+        carbonLoadModel.getFactTimeStamp,
+    true)
     CarbonLoaderUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
     val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,