You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2021/02/04 09:26:49 UTC

[carbondata] branch master updated: [CARBONDATA-4082] Fix alter table add segment query on adding a segment having delete delta files

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new aa7efda  [CARBONDATA-4082] Fix alter table add segment query on adding a segment having delete delta files
aa7efda is described below

commit aa7efda9eec8485723be81455d96ba17851571e8
Author: Karan980 <ka...@gmail.com>
AuthorDate: Mon Jan 4 12:10:05 2021 +0530

    [CARBONDATA-4082] Fix alter table add segment query on adding a segment having delete delta files
    
    Why is this PR needed?
    When a segment is added to a carbon table by alter table add segment query
    and that segment also have a deleteDelta file present in it, then on querying
    the carbon table the deleted rows are coming in the result.
    
    What changes were proposed in this PR?
    Updating the tableStatus and tableUpdateStatus files in correct way for the
    segments having delta delta files.
    
    This closes #4070
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  21 +++-
 .../carbondata/core/util/path/CarbonTablePath.java |   2 +-
 .../hadoop/api/CarbonFileInputFormat.java          |   5 +-
 .../hadoop/api/CarbonOutputCommitter.java          |   6 +-
 .../hadoop/api/CarbonTableOutputFormat.java        |   7 +-
 .../spark/rdd/CarbonDataRDDFactory.scala           |   2 +-
 .../command/management/CarbonAddLoadCommand.scala  | 126 ++++++++++++++++++++-
 .../command/mutation/DeleteExecution.scala         |   2 +-
 .../mutation/merge/CarbonMergeDataSetCommand.scala |   2 +-
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 103 ++++++++++++++++-
 .../processing/merger/CarbonDataMergerUtil.java    |   2 +-
 .../processing/util/CarbonLoaderUtil.java          |  23 ++--
 12 files changed, 275 insertions(+), 26 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c5296dc..e78b630 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.Logger;
 
@@ -145,9 +146,17 @@ public class CarbonUpdateUtil {
    * @return
    */
   public static boolean updateSegmentStatus(List<SegmentUpdateDetails> updateDetailsList,
-      CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) {
+      CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction,
+      boolean isForceWrite) {
     boolean status = false;
     SegmentUpdateStatusManager segmentUpdateStatusManager = new SegmentUpdateStatusManager(table);
+    if (isForceWrite && !CollectionUtils.isEmpty(updateDetailsList)) {
+      String segId = String.valueOf(SegmentStatusManager
+          .createNewSegmentId(segmentUpdateStatusManager.getLoadMetadataDetails()));
+      for (SegmentUpdateDetails detail : updateDetailsList) {
+        detail.setSegmentName(segId);
+      }
+    }
     ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
     boolean lockStatus = false;
 
@@ -178,8 +187,14 @@ public class CarbonUpdateUtil {
             updateDetailsValidSeg.add(updateDetails);
           }
         }
-        segmentUpdateStatusManager
-            .writeLoadDetailsIntoFile(updateDetailsValidSeg, updateStatusFileIdentifier);
+        // In case of ForceWrite, write the segmentUpdateDetails to the tableUpdateStatus file
+        // without any validation of segments.
+        if (isForceWrite) {
+          segmentUpdateStatusManager.writeLoadDetailsIntoFile(oldList, updateStatusFileIdentifier);
+        } else {
+          segmentUpdateStatusManager
+              .writeLoadDetailsIntoFile(updateDetailsValidSeg, updateStatusFileIdentifier);
+        }
         status = true;
       } else {
         LOGGER.error("Not able to acquire the segment update lock.");
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 458ebdf..9967723 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -43,7 +43,7 @@ public class CarbonTablePath {
   private static final String FACT_DIR = "Fact";
   public static final String SEGMENT_PREFIX = "Segment_";
   private static final String PARTITION_PREFIX = "Part";
-  private static final String DATA_PART_PREFIX = "part-";
+  public static final String DATA_PART_PREFIX = "part-";
   public static final String BATCH_PREFIX = "_batchno";
   public static final String TRASH_DIR = ".Trash";
   private static final String LOCK_DIR = "LockFiles";
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
index 91116b4..13e1ef1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonFileInputFormat.java
@@ -296,10 +296,9 @@ public class CarbonFileInputFormat<T> extends CarbonInputFormat<T> implements Se
 
     String expectedDeleteDeltaFileName = null;
     if (segmentFileName != null && !segmentFileName.isEmpty()) {
-      int startIndex = segmentFileName.indexOf(CarbonCommonConstants.HYPHEN);
       int endIndex = segmentFileName.indexOf(CarbonCommonConstants.UNDERSCORE);
-      if (startIndex != -1 && endIndex != -1) {
-        expectedDeleteDeltaFileName = segmentFileName.substring(startIndex + 1, endIndex);
+      if (endIndex != -1) {
+        expectedDeleteDeltaFileName = segmentFileName.substring(0, endIndex);
       }
     }
     String deleteDeltaFullFileName = null;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6ab6adc..98f5502 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -199,7 +199,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
           uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
         }
       } else {
-        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
+        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, false);
       }
       commitJobFinal(context, loadModel, operationContext, carbonTable, uniqueId);
     } else {
@@ -305,7 +305,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
     if (overwriteSet) {
       uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
     } else {
-      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid, false);
     }
     if (operationContext != null) {
       operationContext
@@ -342,7 +342,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       // Commit the removed partitions in carbon store.
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid,
           Segment.toSegmentList(toBeDeletedSegments, null),
-          Segment.toSegmentList(toBeUpdatedSegments, null));
+          Segment.toSegmentList(toBeUpdatedSegments, null), false);
       return uniqueId;
     }
     return null;
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index c7ba7a6..ed447a5 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.util.DataLoadMetrics;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.core.util.ObjectSerializationUtil;
 import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable;
 import org.apache.carbondata.processing.loading.ComplexDelimitersEnum;
@@ -578,7 +579,11 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
           blockName = CarbonUpdateUtil.getBlockName(
               (tuple.split(CarbonCommonConstants.FILE_SEPARATOR)
                       [TupleIdEnum.BLOCK_ID.getTupleIdIndex()]));
-
+          // formatting blockName to create deleteDelta File of same name as created for
+          // transactional tables
+          String[] blockNameSplits = blockName.split(CarbonCommonConstants.UNDERSCORE);
+          blockName = CarbonTablePath.DATA_PART_PREFIX + blockNameSplits[0] +
+                  CarbonTablePath.BATCH_PREFIX + blockNameSplits[1];
           if (!blockToDeleteDeltaBlockMapping.containsKey(blockName)) {
             blockDetails = new DeleteDeltaBlockDetails(blockName);
             blockToDeleteDeltaBlockMapping.put(blockName, blockDetails);
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index f62aa64..754fc63 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -741,7 +741,7 @@ object CarbonDataRDDFactory {
         updateModel.get.deletedSegments.asJava)
     }
     done = done && CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
-      overwriteTable, uuid)
+      overwriteTable, uuid, false)
     if (!done) {
       val errorMessage = s"Dataload failed due to failure in table status updation for" +
                          s" ${carbonLoadModel.getTableName}"
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 8f153a4..44fe54c 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.util
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 import org.apache.hadoop.fs.FileStatus
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
@@ -35,6 +35,8 @@ import org.apache.spark.sql.types.StructType
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.compression.CompressorFactory
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.exception.ConcurrentOperationException
 import org.apache.carbondata.core.index.{IndexStoreManager, Segment}
@@ -42,7 +44,8 @@ import org.apache.carbondata.core.indexstore.{PartitionSpec => CarbonPartitionSp
 import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.Field
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, SegmentUpdateDetails}
+import org.apache.carbondata.core.reader.CarbonDeleteDeltaFileReaderImpl
 import org.apache.carbondata.core.statusmanager.{FileFormat, LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.core.view.{MVSchema, MVStatus}
@@ -261,8 +264,68 @@ case class CarbonAddLoadCommand(
     if (!isCarbonFormat) {
       newLoadMetaEntry.setFileFormat(new FileFormat(format))
     }
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = file.getName
+        .endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT)
+    })
+    val updateTimestamp = System.currentTimeMillis().toString
+    var isUpdateStatusRequired = false
+    if (deltaFiles.nonEmpty) {
+      LOGGER.warn("Adding a modified load to the table. If there is any updated segment for this" +
+        "load, please add updated segment also.")
+      val blockNameToDeltaFilesMap =
+        collection.mutable.Map[String, collection.mutable.ListBuffer[(CarbonFile, String)]]()
+      deltaFiles.foreach { deltaFile =>
+        val tmpDeltaFilePath = deltaFile.getAbsolutePath
+          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+        val deltaFilePathElements = tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+          val deltaFileName = deltaFilePathElements(deltaFilePathElements.length - 1)
+          val blockName = CarbonTablePath.DataFileUtil
+            .getBlockNameFromDeleteDeltaFile(deltaFileName)
+          if (blockNameToDeltaFilesMap.contains(blockName)) {
+            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+          } else {
+            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+            deltaFileList += ((deltaFile, deltaFileName))
+            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+          }
+        }
+      }
+      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+      val columnCompressor = CompressorFactory.getInstance.getCompressor.getName
+      blockNameToDeltaFilesMap.foreach { entry =>
+        val segmentUpdateDetail = new SegmentUpdateDetails()
+        segmentUpdateDetail.setBlockName(entry._1)
+        segmentUpdateDetail.setActualBlockName(
+          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+            CarbonCommonConstants.FACT_FILE_EXT)
+        segmentUpdateDetail.setSegmentName(model.getSegmentId)
+        val blockNameElements = entry._1.split(CarbonCommonConstants.HYPHEN)
+        if (blockNameElements != null && blockNameElements.nonEmpty) {
+          val segmentId = blockNameElements(blockNameElements.length - 1)
+          // Segment ID in cases of SDK is null
+          if (segmentId.equals("null")) {
+            readAllDeltaFiles(entry._2, segmentUpdateDetail)
+          } else {
+            setValidDeltaFileAndDeletedRowCount(entry._2, segmentUpdateDetail)
+          }
+        }
+        segmentUpdateDetails.add(segmentUpdateDetail)
+      }
+      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails,
+        carbonTable,
+        updateTimestamp,
+        false,
+        true)
+      isUpdateStatusRequired = true
+      newLoadMetaEntry.setUpdateDeltaStartTimestamp(updateTimestamp)
+      newLoadMetaEntry.setUpdateDeltaEndTimestamp(updateTimestamp)
+    }
 
-    CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false)
+    CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, false, updateTimestamp,
+      isUpdateStatusRequired)
     val segment = new Segment(
       model.getSegmentId,
       SegmentFileStore.genSegmentFileName(
@@ -369,5 +432,62 @@ case class CarbonAddLoadCommand(
     }
   }
 
+  /**
+   * If there are more than one deleteDelta File present  for a block. Then This method
+   * will pick the deltaFile with highest timestamp, because the default threshold for horizontal
+   * compaction is 1. It is assumed that threshold for horizontal compaction is not changed from
+   * default value. So there will always be only one valid delete delta file present for a block.
+   * It also sets the number of deleted rows for a segment.
+   */
+  def setValidDeltaFileAndDeletedRowCount(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails) : Unit = {
+    var maxDeltaStamp : Long = -1
+    var deletedRowsCount : Long = 0
+    var validDeltaFile : CarbonFile = null
+    deleteDeltaFiles.foreach { deltaFile =>
+      val currentFileTimestamp = CarbonTablePath.DataFileUtil
+        .getTimeStampFromDeleteDeltaFile(deltaFile._2)
+      if (currentFileTimestamp.toLong > maxDeltaStamp) {
+        maxDeltaStamp = currentFileTimestamp.toLong
+        validDeltaFile = deltaFile._1
+      }
+    }
+    val blockDetails =
+      new CarbonDeleteDeltaFileReaderImpl(validDeltaFile.getAbsolutePath).readJson()
+    blockDetails.getBlockletDetails.asScala.foreach { blocklet =>
+      deletedRowsCount = deletedRowsCount + blocklet.getDeletedRows.size()
+    }
+    segmentUpdateDetails.setDeleteDeltaStartTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeleteDeltaEndTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeletedRowsInBlock(deletedRowsCount.toString)
+  }
+
+  /**
+   * As horizontal compaction not supported for SDK segments. So all delta files are valid
+   */
+  def readAllDeltaFiles(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails) : Unit = {
+    var minDeltaStamp : Long = System.currentTimeMillis()
+    var maxDeltaStamp : Long = -1
+    var deletedRowsCount : Long = 0
+    deleteDeltaFiles.foreach { deltaFile =>
+      val currentFileTimestamp = CarbonTablePath.DataFileUtil
+        .getTimeStampFromDeleteDeltaFile(deltaFile._2)
+      minDeltaStamp = Math.min(minDeltaStamp, currentFileTimestamp.toLong)
+      maxDeltaStamp = Math.max(maxDeltaStamp, currentFileTimestamp.toLong)
+      segmentUpdateDetails.addDeltaFileStamp(currentFileTimestamp)
+      val blockDetails =
+        new CarbonDeleteDeltaFileReaderImpl(deltaFile._1.getAbsolutePath).readJson()
+      blockDetails.getBlockletDetails.asScala.foreach { blocklet =>
+        deletedRowsCount = deletedRowsCount + blocklet.getDeletedRows.size()
+      }
+    }
+    segmentUpdateDetails.setDeleteDeltaStartTimestamp(minDeltaStamp.toString)
+    segmentUpdateDetails.setDeleteDeltaEndTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeletedRowsInBlock(deletedRowsCount.toString)
+  }
+
   override protected def opName: String = "ADD SEGMENT WITH PATH"
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 574a651..d4f98b0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -399,7 +399,7 @@ object DeleteExecution {
 
     // this is delete flow so no need of putting timestamp in the status file.
     if (CarbonUpdateUtil
-          .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false) &&
+          .updateSegmentStatus(blockUpdateDetailsList, carbonTable, timestamp, false, false) &&
         CarbonUpdateUtil
           .updateTableMetadataStatus(segmentDetails,
             carbonTable,
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
index b026e4d..895f22a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/merge/CarbonMergeDataSetCommand.scala
@@ -200,7 +200,7 @@ case class CarbonMergeDataSetCommand(
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(deltaPath))
       if (!CarbonUpdateUtil.updateSegmentStatus(tuple._1.asScala.asJava,
         carbonTable,
-        trxMgr.getLatestTrx.toString, false)) {
+        trxMgr.getLatestTrx.toString, false, false)) {
         LOGGER.error("writing of update status file failed")
         throw new CarbonMergeDataSetException("writing of update status file failed")
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 1c8700b..555579b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -37,7 +38,7 @@ import org.apache.carbondata.core.metadata.datatype.{DataTypes, Field}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTestUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.readsupport.impl.CarbonRowReadSupport
-import org.apache.carbondata.sdk.file.{CarbonReader, CarbonWriter}
+import org.apache.carbondata.sdk.file.{CarbonIUD, CarbonReader, CarbonWriter}
 
 class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -82,6 +83,106 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+  test("Test add segment for the segment having delete delta files") {
+    createCarbonTable()
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1
+         | OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""".stripMargin)
+    sql(s"delete from addsegment1 where empno = 12").collect()
+    sql(s"delete from addsegment1 where empno = 13").collect()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    CarbonTestUtil.copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    sql(s"alter table addsegment1 add segment options('path'='$newPath', 'format'='carbon')")
+      .collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(16)))
+    checkAnswer(sql("select count(*) from addsegment1 where empno = 12"), Seq(Row(0)))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    sql("drop table if exists addsegment1")
+  }
+
+  test("Test add segment for the segment having multiple blocks and delete delta files") {
+    sql("drop table if exists addsegment1")
+    sql(
+      """
+        | CREATE TABLE addsegment1 (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata TBLPROPERTIES('SORT_COLUMNS'='empno', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='5')""".stripMargin)
+    sql(
+      s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE addsegment1 OPTIONS
+         |('DELIMITER'= ',', 'QUOTECHAR'= '"', 'GLOBAL_SORT_PARTITIONS'='5')""".stripMargin)
+    sql(s"delete from addsegment1 where empno = 12").collect()
+    sql(s"delete from addsegment1 where empno = 17").collect()
+    val table = CarbonEnv.getCarbonTable(None, "addsegment1") (sqlContext.sparkSession)
+    val path = CarbonTablePath.getSegmentPath(table.getTablePath, "1")
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    CarbonTestUtil.copy(path, newPath)
+    sql("delete from table addsegment1 where segment.id in (1)")
+    sql("clean files for table addsegment1")
+    sql(s"alter table addsegment1 add segment" +
+      s" options('path'='$newPath', 'format'='carbon')").collect()
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(16)))
+    checkAnswer(sql("select count(*) from addsegment1 where empno = 12"), Seq(Row(0)))
+    checkAnswer(sql("select count(*) from addsegment1 where empno = 17"), Seq(Row(0)))
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    sql("drop table if exists addsegment1")
+  }
+
+  test("Test add segment by carbon written by sdk having delete delta files") {
+    val tableName = "add_segment_test"
+    sql(s"drop table if exists $tableName")
+    sql(
+      s"""
+         | CREATE TABLE $tableName (empno int, empname string, designation String, doj Timestamp,
+         | workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+         | projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+         | utilization int,salary int)
+         | STORED AS carbondata
+         |""".stripMargin)
+
+    val externalSegmentPath = storeLocation + "/" + "external_segment"
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+    // write into external segment folder
+    val schemaFilePath = s"$storeLocation/$tableName/Metadata/schema"
+    val writer = CarbonWriter.builder
+      .outputPath(externalSegmentPath)
+      .withSchemaFile(schemaFilePath)
+      .writtenBy("AddSegmentTestCase")
+      .withCsvInput()
+      .build()
+    val source = Source.fromFile(s"$resourcesPath/data.csv")
+    var count = 0
+    for (line <- source.getLines()) {
+      if (count != 0) {
+        writer.write(line.split(","))
+      }
+      count = count + 1
+    }
+    writer.close()
+    CarbonIUD.getInstance().delete(externalSegmentPath, "empno", "12").commit()
+    CarbonIUD.getInstance().delete(externalSegmentPath, "empno", "13").commit()
+    sql(s"alter table $tableName add segment " +
+      s"options('path'='$externalSegmentPath', 'format'='carbon')").collect()
+    // To clear the indexes from cache, which were cached after delete operation on SDK segments.
+    CacheProvider.getInstance().getCarbonCache.getCacheMap.clear()
+    checkAnswer(sql(s"select count(*) from $tableName"), Seq(Row(8)))
+    checkAnswer(sql(s"select count(*) from $tableName where empno = 12"), Seq(Row(0)))
+    checkAnswer(sql(s"select count(*) from $tableName where empno = 13"), Seq(Row(0)))
+    FileFactory.deleteAllFilesOfDir(new File(externalSegmentPath))
+    sql(s"drop table $tableName")
+  }
+
   test("Test added segment drop") {
 
     createCarbonTable()
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 29f629e..5576133 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -1072,7 +1072,7 @@ public final class CarbonDataMergerUtil {
       } else return false;
     }
 
-    CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
+    CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true, false);
 
     // Update the Table Status.
     String metaDataFilepath = table.getMetadataPath();
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index e41b615..32c5ee5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -152,7 +152,8 @@ public final class CarbonLoaderUtil {
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
       throws IOException {
-    return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, "");
+    return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, "",
+        false);
   }
 
   /**
@@ -209,15 +210,15 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
-      final CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
-      throws IOException {
+      final CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
+      boolean isUpdateStatusRequired) throws IOException {
     // For Non Transactional tables no need to update the the Table Status file.
     if (!loadModel.isCarbonTransactionalTable()) {
       return true;
     }
 
     return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid,
-        new ArrayList<Segment>(), new ArrayList<Segment>());
+        new ArrayList<Segment>(), new ArrayList<Segment>(), isUpdateStatusRequired);
   }
 
   /**
@@ -232,7 +233,8 @@ public final class CarbonLoaderUtil {
    */
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
-      List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
+      List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated,
+      boolean isUpdateStatusRequired) throws IOException {
     boolean status = false;
     AbsoluteTableIdentifier identifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -278,6 +280,10 @@ public final class CarbonLoaderUtil {
               }
             }
           } else {
+            if (isUpdateStatusRequired && segmentId.equalsIgnoreCase("0") && !StringUtils
+                .isBlank(uuid)) {
+              newMetaEntry.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(uuid));
+            }
             newMetaEntry.setLoadName(segmentId);
             loadModel.setSegmentId(segmentId);
           }
@@ -339,6 +345,9 @@ public final class CarbonLoaderUtil {
             detail.setSegmentFile(
                 detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
                     + CarbonTablePath.SEGMENT_EXT);
+          } else if (isUpdateStatusRequired && detail.getLoadName().equalsIgnoreCase("0")
+              && !StringUtils.isBlank(uuid)) {
+            detail.setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(uuid));
           }
         }
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
@@ -425,7 +434,7 @@ public final class CarbonLoaderUtil {
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
 
     boolean entryAdded = CarbonLoaderUtil
-        .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
+        .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid, false);
     if (!entryAdded) {
       throw new IOException("Dataload failed due to failure in table status updation for "
           + model.getTableName());
@@ -453,7 +462,7 @@ public final class CarbonLoaderUtil {
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
     boolean entryAdded = CarbonLoaderUtil.recordNewLoadMetadata(
-        loadMetaEntry, model, false, false, uuid);
+        loadMetaEntry, model, false, false, uuid, false);
     if (!entryAdded) {
       throw new IOException(
           "Failed to update failure entry in table status for " + model.getTableName());