You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:09 UTC

[12/16] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index bc84e04..20d3032 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,8 +29,10 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier
-import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
+import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
@@ -109,15 +111,14 @@ case class CarbonMergerMapping(
     var mergedLoadName: String,
     databaseName: String,
     factTableName: String,
-    validSegments: Array[String],
+    validSegments: Array[Segment],
     tableId: String,
     campactionType: CompactionType,
     // maxSegmentColCardinality is Cardinality of last segment of compaction
     var maxSegmentColCardinality: Array[Int],
     // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
     var maxSegmentColumnSchemaList: List[ColumnSchema],
-    currentPartitions: Seq[String],
-    @transient partitionMapper: PartitionMapper)
+    currentPartitions: Option[Seq[PartitionSpec]])
 
 case class NodeInfo(TaskId: String, noOfBlocks: Int)
 
@@ -133,20 +134,20 @@ case class UpdateTableModel(
     isUpdate: Boolean,
     updatedTimeStamp: Long,
     var executorErrors: ExecutionErrors,
-    deletedSegments: Seq[String])
+    deletedSegments: Seq[Segment])
 
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,
     carbonTable: CarbonTable,
     isDDLTrigger: Boolean,
-    currentPartitions: Seq[String])
+    currentPartitions: Option[Seq[PartitionSpec]])
 
 case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     carbonTable: CarbonTable,
     loadsToMerge: util.List[LoadMetadataDetails],
     sqlContext: SQLContext,
     compactionType: CompactionType,
-    currentPartitions: Seq[String])
+    currentPartitions: Option[Seq[PartitionSpec]])
 
 case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
     segmentId: String,
@@ -161,7 +162,7 @@ case class SplitPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
     sqlContext: SQLContext)
 
 case class DropPartitionCallableModel(carbonLoadModel: CarbonLoadModel,
-    segmentId: String,
+    segmentId: Segment,
     partitionId: String,
     oldPartitionIds: List[Int],
     dropWithData: Boolean,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8ed7623..1695a13 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -38,8 +38,8 @@ import org.apache.spark.{SparkEnv, SparkException, TaskContext}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{AnalysisException, CarbonEnv, DataFrame, Row, SQLContext}
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
-import org.apache.spark.sql.execution.command.preaaggregate.PreAggregateUtil
 import org.apache.spark.sql.hive.DistributionUtil
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
@@ -47,6 +47,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo}
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
@@ -207,7 +208,9 @@ object CarbonDataRDDFactory {
                 compactionType,
                 table,
                 compactionModel.isDDLTrigger,
-                CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, table))
+                CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+                  TableIdentifier(table.getTableName,
+                  Some(table.getDatabaseName))))
               // proceed for compaction
               try {
                 CompactionFactory.getCompactor(
@@ -395,26 +398,9 @@ object CarbonDataRDDFactory {
     } catch {
       case ex: Throwable =>
         loadStatus = SegmentStatus.LOAD_FAILURE
-        ex match {
-          case sparkException: SparkException =>
-            if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
-                sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
-              executorMessage = sparkException.getCause.getMessage
-              errorMessage = errorMessage + ": " + executorMessage
-            } else if (sparkException.getCause.isInstanceOf[TextParsingException]) {
-              executorMessage = CarbonDataProcessorUtil
-                .trimErrorMessage(sparkException.getCause.getMessage)
-              errorMessage = errorMessage + " : " + executorMessage
-            }
-          case aex: AnalysisException =>
-            LOGGER.error(aex.getMessage())
-            throw aex
-          case _ =>
-            if (ex.getCause != null) {
-              executorMessage = ex.getCause.getMessage
-              errorMessage = errorMessage + ": " + executorMessage
-            }
-        }
+        val (extrMsgLocal, errorMsgLocal) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
+        executorMessage = extrMsgLocal
+        errorMessage = errorMsgLocal
         LOGGER.info(errorMessage)
         LOGGER.error(ex)
     } finally {
@@ -423,14 +409,7 @@ object CarbonDataRDDFactory {
     // handle the status file updation for the update cmd.
     if (updateModel.isDefined) {
       if (loadStatus == SegmentStatus.LOAD_FAILURE) {
-        if (updateModel.get.executorErrors.failureCauses == FailureCauses.NONE) {
-          updateModel.get.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
-          if (null != executorMessage && !executorMessage.isEmpty) {
-            updateModel.get.executorErrors.errorMsg = executorMessage
-          } else {
-            updateModel.get.executorErrors.errorMsg = "Update failed as the data load has failed."
-          }
-        }
+        CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
         return
       } else if (loadStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS &&
                  updateModel.get.executorErrors.failureCauses == FailureCauses.BAD_RECORDS &&
@@ -441,12 +420,12 @@ object CarbonDataRDDFactory {
         // success case.
         // write the dictionary file in case of single_pass true
         writeDictionary(carbonLoadModel, result, false)
-        val segmentDetails = new util.HashSet[String]()
+        val segmentDetails = new util.HashSet[Segment]()
         var resultSize = 0
         res.foreach { resultOfSeg =>
           resultSize = resultSize + resultOfSeg.size
           resultOfSeg.foreach { resultOfBlock =>
-            segmentDetails.add(resultOfBlock._2._1.getLoadName)
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getLoadName, null))
           }
         }
 
@@ -462,7 +441,7 @@ object CarbonDataRDDFactory {
           carbonTable,
           updateModel.get.updatedTimeStamp + "",
           true,
-          new util.ArrayList[String](0))) {
+          new util.ArrayList[Segment](0))) {
           LOGGER.audit("Data update is successful for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         } else {
@@ -744,7 +723,9 @@ object CarbonDataRDDFactory {
         CompactionType.MINOR,
         carbonTable,
         isCompactionTriggerByDDl,
-        CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable))
+        CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+          TableIdentifier(carbonTable.getTableName,
+          Some(carbonTable.getDatabaseName))))
       var storeLocation = ""
       val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != configuredStore && configuredStore.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index e7bdff8..07acaa5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -18,26 +18,23 @@
 package org.apache.carbondata.spark.rdd
 
 import java.util
-import java.util.{List, Map}
+import java.util.List
 import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
-import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Future}
 
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
 
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events._
-import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent, LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory.LOGGER
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -136,39 +133,19 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     var finalMergeStatus = false
     val databaseName: String = carbonLoadModel.getDatabaseName
     val factTableName = carbonLoadModel.getTableName
-    val validSegments: Array[String] = CarbonDataMergerUtil
-      .getValidSegments(loadsToMerge).split(',')
-    val partitionMapper = if (carbonTable.isHivePartitionTable) {
-      var partitionMap: util.Map[String, util.List[String]] = null
-      validSegments.foreach { segmentId =>
-        val localMapper = new PartitionMapFileStore()
-        localMapper.readAllPartitionsOfSegment(
-          CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId))
-        if (partitionMap == null) {
-          partitionMap = localMapper.getPartitionMap
-        } else {
-          partitionMap.putAll(localMapper.getPartitionMap)
-        }
-      }
-      val mapper = new PartitionMapper()
-      mapper.setPartitionMap(partitionMap)
-      mapper
-    } else {
-      null
-    }
+    val validSegments: List[Segment] = CarbonDataMergerUtil.getValidSegments(loadsToMerge)
     val carbonMergerMapping = CarbonMergerMapping(
       tablePath,
       carbonTable.getMetaDataFilepath,
       mergedLoadName,
       databaseName,
       factTableName,
-      validSegments,
+      validSegments.asScala.toArray,
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
       compactionType,
       maxSegmentColCardinality = null,
       maxSegmentColumnSchemaList = null,
-      currentPartitions = partitions,
-      partitionMapper)
+      currentPartitions = partitions)
     carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
     carbonLoadModel.setLoadMetadataDetails(
       SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
@@ -221,11 +198,28 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     if (finalMergeStatus) {
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
-      new PartitionMapFileStore().mergePartitionMapFiles(
-        CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber),
-        carbonLoadModel.getFactTimeStamp + "")
+      var segmentFileName: String = null
+      if (carbonTable.isHivePartitionTable) {
+        val readPath =
+          CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath) +
+          CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp + ".tmp"
+        // Merge all partition files into a single file.
+        segmentFileName =
+          mergedLoadNumber + "_" + carbonLoadModel.getFactTimeStamp
+        val segmentFile = SegmentFileStore
+          .mergeSegmentFiles(readPath,
+            segmentFileName,
+            CarbonTablePath.getSegmentFilesLocation(carbonLoadModel.getTablePath))
+        if (segmentFile != null) {
+          SegmentFileStore
+            .moveFromTempFolder(segmentFile,
+              carbonLoadModel.getFactTimeStamp + ".tmp",
+              carbonLoadModel.getTablePath)
+        }
+        segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
+      }
       // trigger event for compaction
-      val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
+      val alterTableCompactionPreStatusUpdateEvent =
       AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,
         carbonTable,
         carbonMergerMapping,
@@ -242,9 +236,13 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
              carbonTable.getMetaDataFilepath,
              carbonLoadModel)) ||
-        CarbonDataMergerUtil
-          .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath,
-            mergedLoadNumber, carbonLoadModel, compactionType)
+        CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
+          loadsToMerge,
+            carbonTable.getMetaDataFilepath,
+            mergedLoadNumber,
+          carbonLoadModel,
+          compactionType,
+          segmentFileName)
       val compactionLoadStatusPostEvent = AlterTableCompactionPostStatusUpdateEvent(carbonTable,
         carbonMergerMapping,
         carbonLoadModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 833c6fe..c286c50 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -55,7 +55,9 @@ case class CarbonCountStar(
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
-          TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName))).asJava),
+          TableIdentifier(
+            carbonTable.getTableName,
+            Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
       absoluteTableIdentifier)
     val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 0978fab..46905b8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.Expression
@@ -67,7 +68,7 @@ case class CarbonDatasourceHadoopRelation(
 
   def buildScan(requiredColumns: Array[String],
       filters: Array[Filter],
-      partitions: Seq[String]): RDD[InternalRow] = {
+      partitions: Seq[PartitionSpec]): RDD[InternalRow] = {
     val filterExpression: Option[Expression] = filters.flatMap { filter =>
       CarbonFilters.createCarbonFilter(schema, filter)
     }.reduceOption(new AndExpression(_, _))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 667d550..7e3b699 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -206,7 +206,9 @@ case class CarbonAlterTableCompactionCommand(
       compactionType,
       carbonTable,
       isCompactionTriggerByDDl,
-      CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable)
+      CarbonFilters.getCurrentPartitions(sqlContext.sparkSession,
+        TableIdentifier(carbonTable.getTableName,
+        Some(carbonTable.getDatabaseName)))
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 4f90fb5..d2adc57 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.exception.ConcurrentOperationException
@@ -89,14 +90,10 @@ case class CarbonCleanFilesCommand(
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) {
-      Some(CarbonFilters.getPartitions(
-        Seq.empty[Expression],
-        sparkSession,
-        TableIdentifier(tableName, databaseNameOp)))
-    } else {
-      None
-    }
+    val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+      Seq.empty[Expression],
+      sparkSession,
+      TableIdentifier(tableName, databaseNameOp))
     CarbonStore.cleanFiles(
       dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName = tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9bdaddb..7800d3e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -42,6 +42,7 @@ import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFile
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types._
+import org.apache.spark.storage.StorageLevel
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
 
@@ -51,11 +52,13 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.SegmentStatus
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
@@ -72,7 +75,7 @@ import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServicePr
 import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark
-import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl}
 
 case class CarbonLoadDataCommand(
@@ -97,6 +100,8 @@ case class CarbonLoadDataCommand(
 
   var sizeInBytes: Long = _
 
+  var currPartitions: util.List[PartitionSpec] = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
@@ -123,6 +128,12 @@ case class CarbonLoadDataCommand(
           case l: LogicalRelation => l
         }.head
       sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
+      currPartitions = CarbonFilters.getCurrentPartitions(
+        sparkSession,
+        TableIdentifier(tableName, databaseNameOp)) match {
+        case Some(parts) => new util.ArrayList(parts.toList.asJava)
+        case _ => null
+      }
     }
     operationContext.setProperty("isOverwrite", isOverwriteTable)
     if(CarbonUtil.hasAggregationDataMap(table)) {
@@ -182,8 +193,9 @@ case class CarbonLoadDataCommand(
         options,
         optionsFinal,
         carbonLoadModel,
-        hadoopConf
-      )
+        hadoopConf,
+        partition,
+        dataFrame.isDefined)
       // Delete stale segment folders that are not in table status but are physically present in
       // the Fact folder
       LOGGER.info(s"Deleting stale folders if present for table $dbName.$tableName")
@@ -215,7 +227,10 @@ case class CarbonLoadDataCommand(
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
         // Clean up the old invalid segment data before creating a new entry for new load.
-        DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+        DataLoadingUtil.deleteLoadsAndUpdateMetadata(
+          isForceDeletion = false,
+          table,
+          currPartitions)
         // add the start entry for the new load in the table status file
         if (updateModel.isEmpty && !table.isHivePartitionTable) {
           CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
@@ -259,7 +274,8 @@ case class CarbonLoadDataCommand(
             columnar,
             partitionStatus,
             hadoopConf,
-            operationContext)
+            operationContext,
+            LOGGER)
         } else {
           loadData(
             sparkSession,
@@ -267,7 +283,8 @@ case class CarbonLoadDataCommand(
             columnar,
             partitionStatus,
             hadoopConf,
-            operationContext)
+            operationContext,
+            LOGGER)
         }
         val loadTablePostExecutionEvent: LoadTablePostExecutionEvent =
           new LoadTablePostExecutionEvent(
@@ -331,7 +348,9 @@ case class CarbonLoadDataCommand(
       columnar: Boolean,
       partitionStatus: SegmentStatus,
       hadoopConf: Configuration,
-      operationContext: OperationContext): Unit = {
+      operationContext: OperationContext,
+      LOGGER: LogService): Seq[Row] = {
+    var rows = Seq.empty[Row]
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       .getCarbonTableIdentifier
@@ -418,12 +437,13 @@ case class CarbonLoadDataCommand(
 
     if (carbonTable.isHivePartitionTable) {
       try {
-        loadDataWithPartition(
+        rows = loadDataWithPartition(
           sparkSession,
           carbonLoadModel,
           hadoopConf,
           loadDataFrame,
-          operationContext)
+          operationContext,
+          LOGGER)
       } finally {
         server match {
           case Some(dictServer) =>
@@ -450,6 +470,7 @@ case class CarbonLoadDataCommand(
         updateModel,
         operationContext)
     }
+    rows
   }
 
   private def loadData(
@@ -458,7 +479,9 @@ case class CarbonLoadDataCommand(
       columnar: Boolean,
       partitionStatus: SegmentStatus,
       hadoopConf: Configuration,
-      operationContext: OperationContext): Unit = {
+      operationContext: OperationContext,
+      LOGGER: LogService): Seq[Row] = {
+    var rows = Seq.empty[Row]
     val (dictionaryDataFrame, loadDataFrame) = if (updateModel.isDefined) {
       val dataFrameWithTupleId: DataFrame = getDataFrameWithTupleID()
       // getting all fields except tupleId field as it is not required in the value
@@ -478,12 +501,12 @@ case class CarbonLoadDataCommand(
         dictionaryDataFrame)
     }
     if (table.isHivePartitionTable) {
-      loadDataWithPartition(
+      rows = loadDataWithPartition(
         sparkSession,
         carbonLoadModel,
         hadoopConf,
         loadDataFrame,
-        operationContext)
+        operationContext, LOGGER)
     } else {
       CarbonDataRDDFactory.loadCarbonData(
         sparkSession.sqlContext,
@@ -497,6 +520,7 @@ case class CarbonLoadDataCommand(
         updateModel,
         operationContext)
     }
+    rows
   }
 
   /**
@@ -504,24 +528,16 @@ case class CarbonLoadDataCommand(
    * into partitoned data. The table relation would be converted to HadoopFSRelation to let spark
    * handling the partitioning.
    */
-  private def loadDataWithPartition(sparkSession: SparkSession,
+  private def loadDataWithPartition(
+      sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame],
-      operationContext: OperationContext): Unit = {
+      operationContext: OperationContext,
+      LOGGER: LogService): Seq[Row] = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
     val catalogTable: CatalogTable = logicalPartitionRelation.catalogTable.get
-    val currentPartitions =
-      CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
-    // Clean up the alreday dropped partitioned data
-    new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false)
-    // Converts the data to carbon understandable format. The timestamp/date format data needs to
-    // converted to hive standard fomat to let spark understand the data to partition.
-    val serializationNullFormat =
-      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-    val badRecordAction =
-      carbonLoadModel.getBadRecordsAction.split(",")(1)
     var timeStampformatString = carbonLoadModel.getTimestampformat
     if (timeStampformatString.isEmpty) {
       timeStampformatString = carbonLoadModel.getDefaultTimestampFormat
@@ -532,127 +548,114 @@ case class CarbonLoadDataCommand(
       dateFormatString = carbonLoadModel.getDefaultDateFormat
     }
     val dateFormat = new SimpleDateFormat(dateFormatString)
-    CarbonSession.threadSet(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT, dateFormatString)
-    CarbonSession.threadSet(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
-      timeStampformatString)
-    CarbonSession.threadSet(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
-      serializationNullFormat)
-    CarbonSession.threadSet(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      badRecordAction)
-    val isEmptyBadRecord = carbonLoadModel.getIsEmptyDataBadRecord.split(",")(1)
-    CarbonSession.threadSet(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      isEmptyBadRecord)
+    // Clean up the alreday dropped partitioned data
+    SegmentFileStore.cleanSegments(table, null, false)
     CarbonSession.threadSet("partition.operationcontext", operationContext)
     // input data from csv files. Convert to logical plan
     val allCols = new ArrayBuffer[String]()
     allCols ++= table.getAllDimensions.asScala.map(_.getColName)
     allCols ++= table.getAllMeasures.asScala.map(_.getColName)
     var attributes =
-      StructType(allCols.map(StructField(_, StringType))).toAttributes
+      StructType(
+        allCols.filterNot(_.equals(CarbonCommonConstants.DEFAULT_INVISIBLE_DUMMY_MEASURE)).map(
+          StructField(_, StringType))).toAttributes
 
     var partitionsLen = 0
     val sortScope = CarbonDataProcessorUtil.getSortScope(carbonLoadModel.getSortScope)
-    def transformQuery(rdd: RDD[Row], isDataFrame: Boolean) = {
-      val updatedRdd = convertData(rdd, sparkSession, carbonLoadModel, isDataFrame)
-      val catalogAttributes = catalogTable.schema.toAttributes
-      attributes = attributes.map(a => {
-        catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
-      })
-      attributes = attributes.map { attr =>
-        val column = table.getColumnByName(table.getTableName, attr.name)
-        if (column.hasEncoding(Encoding.DICTIONARY)) {
-          AttributeReference(
-            attr.name,
-            IntegerType,
-            attr.nullable,
-            attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
-        } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
-          AttributeReference(
-            attr.name,
-            LongType,
-            attr.nullable,
-            attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
-        } else {
-          attr
-        }
-      }
-      // Only select the required columns
-      val output = if (partition.nonEmpty) {
-        val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
-        catalogTable.schema.map { attr =>
-          attributes.find(_.name.equalsIgnoreCase(attr.name)).get
-        }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
-      } else {
-        catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
-      }
-      partitionsLen = rdd.partitions.length
-      val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
-      if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
-        val sortColumns = table.getSortColumns(table.getTableName)
-        Sort(output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
-          true,
-          child)
-      } else {
-        child
-      }
+    val partitionValues = if (partition.nonEmpty) {
+      partition.filter(_._2.nonEmpty).map{ case(col, value) =>
+        val field = catalogTable.schema.find(_.name.equalsIgnoreCase(col)).get
+        CarbonScalaUtil.convertToDateAndTimeFormats(
+          value.get,
+          field.dataType,
+          timeStampFormat,
+          dateFormat)
+      }.toArray
+    } else {
+      Array[String]()
     }
-
+    var persistedRDD: Option[RDD[InternalRow]] = None
     try {
       val query: LogicalPlan = if (dataFrame.isDefined) {
-        val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
-        val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
-        val dfAttributes =
-          StructType(dataFrame.get.schema.fields.map(_.copy(dataType = StringType))).toAttributes
-        val partitionValues = if (partition.nonEmpty) {
-          partition.values.filter(_.nonEmpty).map(_.get).toArray
+        val (rdd, dfAttributes) = if (updateModel.isDefined) {
+          // Get the updated query plan in case of update scenario
+          val updatedFrame = Dataset.ofRows(
+            sparkSession,
+            getLogicalQueryForUpdate(
+              sparkSession,
+              catalogTable,
+              dataFrame.get,
+              carbonLoadModel))
+          (updatedFrame.rdd, updatedFrame.schema)
         } else {
-          Array[String]()
+          if (partition.nonEmpty) {
+            val headers = carbonLoadModel.getCsvHeaderColumns.dropRight(partition.size)
+            val updatedHeader = headers ++ partition.keys.map(_.toLowerCase)
+            carbonLoadModel.setCsvHeader(updatedHeader.mkString(","))
+            carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
+          }
+          (dataFrame.get.rdd, dataFrame.get.schema)
+        }
+
+        val expectedColumns = {
+          val staticPartCols = partition.filter(_._2.isDefined).keySet
+          attributes.filterNot(a => staticPartCols.contains(a.name))
+        }
+        if (expectedColumns.length != dfAttributes.length) {
+          throw new AnalysisException(
+            s"Cannot insert into table $tableName because the number of columns are different: " +
+            s"need ${expectedColumns.length} columns, " +
+            s"but query has ${dfAttributes.length} columns.")
+        }
+        val nonPartitionBounds = expectedColumns.zipWithIndex.map(_._2).toArray
+        val partitionBounds = new Array[Int](partitionValues.length)
+        if (partition.nonEmpty) {
+          val nonPartitionSchemaLen = attributes.length - partition.size
+          var i = nonPartitionSchemaLen
+          var index = 0
+          var partIndex = 0
+          partition.values.foreach { p =>
+            if (p.isDefined) {
+              partitionBounds(partIndex) = nonPartitionSchemaLen + index
+              partIndex = partIndex + 1
+            } else {
+              nonPartitionBounds(i) = nonPartitionSchemaLen + index
+              i = i + 1
+            }
+            index = index + 1
+          }
         }
-        val len = dfAttributes.length
-        val rdd = dataFrame.get.rdd.map { f =>
+
+        val len = dfAttributes.length + partitionValues.length
+        val transRdd = rdd.map { f =>
           val data = new Array[Any](len)
           var i = 0
           while (i < f.length) {
-            data(i) =
-              UTF8String.fromString(
-                CarbonScalaUtil.getString(f.get(i),
-                  serializationNullFormat,
-                  delimiterLevel1,
-                  delimiterLevel2,
-                  timeStampFormat,
-                  dateFormat))
+            data(nonPartitionBounds(i)) = f.get(i)
             i = i + 1
           }
-          if (partitionValues.length > 0) {
-            var j = 0
-            while (i < len) {
-              data(i) = UTF8String.fromString(partitionValues(j))
-              j = j + 1
-              i = i + 1
-            }
+          var j = 0
+          while (j < partitionBounds.length) {
+            data(partitionBounds(j)) = UTF8String.fromString(partitionValues(j))
+            j = j + 1
           }
           Row.fromSeq(data)
         }
-        val transRdd = if (updateModel.isDefined) {
-          // Get the updated query plan in case of update scenario
-          Dataset.ofRows(
+
+        val (transformedPlan, partitions, persistedRDDLocal) =
+          transformQuery(
+            transRdd,
             sparkSession,
-            getLogicalQueryForUpdate(
-              sparkSession,
-              catalogTable,
-              dfAttributes,
-              rdd.map(row => InternalRow.fromSeq(row.toSeq)),
-              carbonLoadModel)).rdd
-        } else {
-          rdd
-        }
-        transformQuery(transRdd, true)
+            carbonLoadModel,
+            partitionValues,
+            catalogTable,
+            attributes,
+            sortScope,
+            isDataFrame = true)
+        partitionsLen = partitions
+        persistedRDD = persistedRDDLocal
+        transformedPlan
       } else {
-
         val rowDataTypes = attributes.map { attribute =>
           catalogTable.schema.find(_.name.equalsIgnoreCase(attribute.name)) match {
             case Some(attr) => attr.dataType
@@ -667,12 +670,29 @@ case class CarbonLoadDataCommand(
           }
         }
         val columnCount = carbonLoadModel.getCsvHeaderColumns.length
-        var rdd = DataLoadingUtil.csvFileScanRDD(
+        val rdd = DataLoadingUtil.csvFileScanRDD(
           sparkSession,
           model = carbonLoadModel,
           hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
-        transformQuery(rdd.asInstanceOf[RDD[Row]], false)
+        val (transformedPlan, partitions, persistedRDDLocal) =
+          transformQuery(
+            rdd.asInstanceOf[RDD[Row]],
+            sparkSession,
+            carbonLoadModel,
+            partitionValues,
+            catalogTable,
+            attributes,
+            sortScope,
+            isDataFrame = false)
+        partitionsLen = partitions
+        persistedRDD = persistedRDDLocal
+        transformedPlan
+      }
+      if (updateModel.isDefined) {
+        carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)
       }
+      // Create and ddd the segment to the tablestatus.
+      CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, isOverwriteTable)
       val convertRelation = convertToLogicalRelation(
         catalogTable,
         sizeInBytes,
@@ -703,23 +723,37 @@ case class CarbonLoadDataCommand(
           overwrite = false,
           ifPartitionNotExists = false)
       Dataset.ofRows(sparkSession, convertedPlan)
+    } catch {
+      case ex: Throwable =>
+        val (executorMessage, errorMessage) = CarbonScalaUtil.retrieveAndLogErrorMsg(ex, LOGGER)
+        if (updateModel.isDefined) {
+          CarbonScalaUtil.updateErrorInUpdateModel(updateModel.get, executorMessage)
+        }
+        LOGGER.info(errorMessage)
+        LOGGER.error(ex)
+        throw new Exception(errorMessage)
     } finally {
-      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT)
-      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT)
-      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT)
-      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION)
-      CarbonSession.threadUnset(CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD)
       CarbonSession.threadUnset("partition.operationcontext")
       if (isOverwriteTable) {
         DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
         // Clean the overwriting segments if any.
-        new PartitionMapFileStore().cleanSegments(
+        SegmentFileStore.cleanSegments(
           table,
-          CarbonFilters.getPartitions(Seq.empty, sparkSession, identifier).asJava,
+          null,
           false)
       }
+      if (partitionsLen > 1) {
+        // clean cache only if persisted and keeping unpersist non-blocking as non-blocking call
+        // will not have any functional impact as spark automatically monitors the cache usage on
+        // each node and drops out old data partitions in a least-recently used (LRU) fashion.
+        persistedRDD match {
+          case Some(rdd) => rdd.unpersist(false)
+          case _ =>
+        }
+      }
     }
     try {
+      carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
       // Trigger auto compaction
       CarbonDataRDDFactory.handleSegmentMerging(
         sparkSession.sqlContext,
@@ -732,37 +766,140 @@ case class CarbonLoadDataCommand(
           "Dataload is success. Auto-Compaction has failed. Please check logs.",
           e)
     }
+    val specs =
+      SegmentFileStore.getPartitionSpecs(carbonLoadModel.getSegmentId, carbonLoadModel.getTablePath)
+    if (specs != null) {
+      specs.asScala.map{ spec =>
+        Row(spec.getPartitions.asScala.mkString("/"), spec.getLocation.toString, spec.getUuid)
+      }
+    } else {
+      Seq.empty[Row]
+    }
   }
 
+  /**
+   * Transform the rdd to logical plan as per the sortscope. If it is global sort scope then it
+   * will convert to sort logical plan otherwise project plan.
+   */
+  private def transformQuery(rdd: RDD[Row],
+      sparkSession: SparkSession,
+      loadModel: CarbonLoadModel,
+      partitionValues: Array[String],
+      catalogTable: CatalogTable,
+      curAttributes: Seq[AttributeReference],
+      sortScope: SortScopeOptions.SortScope,
+      isDataFrame: Boolean): (LogicalPlan, Int, Option[RDD[InternalRow]]) = {
+    // Converts the data as per the loading steps before give it to writer or sorter
+    val updatedRdd = convertData(
+      rdd,
+      sparkSession,
+      loadModel,
+      isDataFrame,
+      partitionValues)
+    val catalogAttributes = catalogTable.schema.toAttributes
+    var attributes = curAttributes.map(a => {
+      catalogAttributes.find(_.name.equalsIgnoreCase(a.name)).get
+    })
+    attributes = attributes.map { attr =>
+      // Update attribute datatypes in case of dictionary columns, in case of dictionary columns
+      // datatype is always int
+      val column = table.getColumnByName(table.getTableName, attr.name)
+      if (column.hasEncoding(Encoding.DICTIONARY)) {
+        AttributeReference(
+          attr.name,
+          IntegerType,
+          attr.nullable,
+          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+      } else if (attr.dataType == TimestampType || attr.dataType == DateType) {
+        AttributeReference(
+          attr.name,
+          LongType,
+          attr.nullable,
+          attr.metadata)(attr.exprId, attr.qualifier, attr.isGenerated)
+      } else {
+        attr
+      }
+    }
+    // Only select the required columns
+    val output = if (partition.nonEmpty) {
+      val lowerCasePartition = partition.map { case (key, value) => (key.toLowerCase, value) }
+      catalogTable.schema.map { attr =>
+        attributes.find(_.name.equalsIgnoreCase(attr.name)).get
+      }.filter(attr => lowerCasePartition.getOrElse(attr.name.toLowerCase, None).isEmpty)
+    } else {
+      catalogTable.schema.map(f => attributes.find(_.name.equalsIgnoreCase(f.name)).get)
+    }
+    val partitionsLen = rdd.partitions.length
+
+    // If it is global sort scope then appl sort logical plan on the sort columns
+    if (sortScope == SortScopeOptions.SortScope.GLOBAL_SORT) {
+      // Because if the number of partitions greater than 1, there will be action operator(sample)
+      // in sortBy operator. So here we cache the rdd to avoid do input and convert again.
+      if (partitionsLen > 1) {
+        updatedRdd.persist(StorageLevel.fromString(
+          CarbonProperties.getInstance().getGlobalSortRddStorageLevel))
+      }
+      val child = Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession))
+      val sortColumns = table.getSortColumns(table.getTableName)
+      val sortPlan =
+        Sort(
+          output.filter(f => sortColumns.contains(f.name)).map(SortOrder(_, Ascending)),
+          global = true,
+          child)
+      (sortPlan, partitionsLen, Some(updatedRdd))
+    } else {
+      (Project(output, LogicalRDD(attributes, updatedRdd)(sparkSession)), partitionsLen, None)
+    }
+  }
+
+  /**
+   * Convert the rdd as per steps of data loading inputprocessor step and coverter step
+   * @param originRDD
+   * @param sparkSession
+   * @param model
+   * @param isDataFrame
+   * @param partitionValues
+   * @return
+   */
   private def convertData(
       originRDD: RDD[Row],
       sparkSession: SparkSession,
       model: CarbonLoadModel,
-      isDataFrame: Boolean): RDD[InternalRow] = {
+      isDataFrame: Boolean,
+      partitionValues: Array[String]): RDD[InternalRow] = {
     model.setPartitionId("0")
     val sc = sparkSession.sparkContext
+    val info =
+      model.getCarbonDataLoadSchema.getCarbonTable.getTableInfo.getFactTable.getPartitionInfo
+    info.setColumnSchemaList(new util.ArrayList[ColumnSchema](info.getColumnSchemaList))
     val modelBroadcast = sc.broadcast(model)
     val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
 
     val inputStepRowCounter = sc.accumulator(0, "Input Processor Accumulator")
     // 1. Input
-    var convertRDD =
+    val convertRDD =
       if (isDataFrame) {
         originRDD.mapPartitions{rows =>
           DataLoadProcessorStepOnSpark.toRDDIterator(rows, modelBroadcast)
         }
       } else {
-        originRDD.map{row =>
-          val array = new Array[AnyRef](row.length)
+        // Append the partition columns in case of static partition scenario
+        val partitionLen = partitionValues.length
+        val len = model.getCsvHeaderColumns.length - partitionLen
+        originRDD.map{ row =>
+          val array = new Array[AnyRef](len + partitionLen)
           var i = 0
-          while (i < array.length) {
+          while (i < len) {
             array(i) = row.get(i).asInstanceOf[AnyRef]
             i = i + 1
           }
+          if (partitionLen > 0) {
+            System.arraycopy(partitionValues, 0, array, i, partitionLen)
+          }
           array
         }
       }
-    val finalRDD = convertRDD.mapPartitionsWithIndex { case (index, rows) =>
+    val finalRDD = convertRDD.mapPartitionsWithIndex {case(index, rows) =>
         DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
         DataLoadProcessorStepOnSpark.inputAndconvertFunc(
           rows,
@@ -783,12 +920,11 @@ case class CarbonLoadDataCommand(
   private def getLogicalQueryForUpdate(
       sparkSession: SparkSession,
       catalogTable: CatalogTable,
-      attributes: Seq[AttributeReference],
-      rdd: RDD[InternalRow],
+      df: DataFrame,
       carbonLoadModel: CarbonLoadModel): LogicalPlan = {
     sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
     // In case of update, we don't need the segmrntid column in case of partitioning
-    val dropAttributes = attributes.dropRight(1)
+    val dropAttributes = df.logicalPlan.output.dropRight(1)
     val finalOutput = catalogTable.schema.map { attr =>
       dropAttributes.find { d =>
         val index = d.name.lastIndexOf("-updatedColumn")
@@ -801,7 +937,7 @@ case class CarbonLoadDataCommand(
     }
     carbonLoadModel.setCsvHeader(catalogTable.schema.map(_.name.toLowerCase).mkString(","))
     carbonLoadModel.setCsvHeaderColumns(carbonLoadModel.getCsvHeader.split(","))
-    Project(finalOutput, LogicalRDD(attributes, rdd)(sparkSession))
+    Project(finalOutput, df.logicalPlan)
   }
 
   private def convertToLogicalRelation(
@@ -855,9 +991,19 @@ case class CarbonLoadDataCommand(
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
       if (updateModel.get.deletedSegments.nonEmpty) {
-        options += (("segmentsToBeDeleted", updateModel.get.deletedSegments.mkString(",")))
+        options += (("segmentsToBeDeleted",
+          updateModel.get.deletedSegments.map(_.getSegmentNo).mkString(",")))
       }
     }
+    if (currPartitions != null) {
+      val currPartStr = ObjectSerializationUtil.convertObjectToString(currPartitions)
+      options += (("currentpartition", currPartStr))
+    }
+    if (loadModel.getSegmentId != null) {
+      val currLoadEntry =
+        ObjectSerializationUtil.convertObjectToString(loadModel.getCurrentLoadMetadataDetail)
+      options += (("currentloadentry", currLoadEntry))
+    }
     val hdfsRelation = HadoopFsRelation(
       location = catalog,
       partitionSchema = partitionSchema,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 72ed051..e3e4c7a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -28,8 +28,10 @@ import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, Me
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, PartitionMapFileStore}
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{DataMapSchema, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -228,10 +230,17 @@ case class RefreshCarbonTableCommand(
     val allpartitions = metadataDetails.map{ metadata =>
       if (metadata.getSegmentStatus == SegmentStatus.SUCCESS ||
           metadata.getSegmentStatus == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-        val mapper = new PartitionMapFileStore()
-        mapper.readAllPartitionsOfSegment(
-          CarbonTablePath.getSegmentPath(absIdentifier.getTablePath, metadata.getLoadName))
-        Some(mapper.getPartitionMap.values().asScala)
+        val mapper = new SegmentFileStore(absIdentifier.getTablePath, metadata.getSegmentFile)
+        val specs = mapper.getLocationMap.asScala.map { case(location, fd) =>
+          var updatedLoc =
+            if (fd.isRelative) {
+              absIdentifier.getTablePath + CarbonCommonConstants.FILE_SEPARATOR + location
+            } else {
+              location
+            }
+          new PartitionSpec(fd.getPartitions, updatedLoc)
+        }
+        Some(specs)
       } else {
         None
       }
@@ -240,14 +249,14 @@ case class RefreshCarbonTableCommand(
       TableIdentifier(absIdentifier.getTableName, Some(absIdentifier.getDatabaseName))
     // Register the partition information to the hive metastore
     allpartitions.foreach { segPartitions =>
-      val specs: Seq[TablePartitionSpec] = segPartitions.map { indexPartitions =>
-        indexPartitions.asScala.map{ p =>
+      val specs: Seq[(TablePartitionSpec, Option[String])] = segPartitions.map { indexPartitions =>
+        (indexPartitions.getPartitions.asScala.map{ p =>
           val spec = p.split("=")
           (spec(0), spec(1))
-        }.toMap
+        }.toMap, Some(indexPartitions.getLocation.toString))
       }.toSeq
       // Add partition information
-      AlterTableAddPartitionCommand(identifier, specs.map((_, None)), true).run(sparkSession)
+      AlterTableAddPartitionCommand(identifier, specs, true).run(sparkSession)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 756d120..4886676 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -173,7 +174,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
       sparkSession: SparkSession,
       currentTime: Long,
       executorErrors: ExecutionErrors,
-      deletedSegments: Seq[String]): Unit = {
+      deletedSegments: Seq[Segment]): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
       val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 1ac0b34..25d5e91 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -35,6 +35,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, DeleteDeltaBlockDetails, SegmentUpdateDetails, TupleIdEnum}
@@ -61,15 +62,20 @@ object DeleteExecution {
       dataRdd: RDD[Row],
       timestamp: String,
       isUpdateOperation: Boolean,
-      executorErrors: ExecutionErrors): Seq[String] = {
+      executorErrors: ExecutionErrors): Seq[Segment] = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
     val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val factPath = carbonTablePath.getFactDir
-    var segmentsTobeDeleted = Seq.empty[String]
+    val isPartitionTable = carbonTable.isHivePartitionTable
+    val factPath = if (isPartitionTable) {
+      carbonTablePath.getPath
+    } else {
+      carbonTablePath.getFactDir
+    }
+    var segmentsTobeDeleted = Seq.empty[Segment]
 
     val deleteRdd = if (isUpdateOperation) {
       val schema =
@@ -104,7 +110,7 @@ object DeleteExecution {
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
-          TableIdentifier(tableName, databaseNameOp)).asJava)
+          TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
     val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
@@ -144,12 +150,12 @@ object DeleteExecution {
     // all or none : update status file, only if complete delete opeartion is successfull.
     def checkAndUpdateStatusFiles(): Unit = {
       val blockUpdateDetailsList = new util.ArrayList[SegmentUpdateDetails]()
-      val segmentDetails = new util.HashSet[String]()
+      val segmentDetails = new util.HashSet[Segment]()
       res.foreach(resultOfSeg => resultOfSeg.foreach(
         resultOfBlock => {
           if (resultOfBlock._1 == SegmentStatus.SUCCESS) {
             blockUpdateDetailsList.add(resultOfBlock._2._1)
-            segmentDetails.add(resultOfBlock._2._1.getSegmentName)
+            segmentDetails.add(new Segment(resultOfBlock._2._1.getSegmentName, null))
             // if this block is invalid then decrement block count in map.
             if (CarbonUpdateUtil.isBlockInvalid(resultOfBlock._2._1.getSegmentStatus)) {
               CarbonUpdateUtil.decrementDeletedBlockCount(resultOfBlock._2._1,
@@ -250,7 +256,7 @@ object DeleteExecution {
             countOfRows = countOfRows + 1
           }
 
-          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath)
+          val blockPath = CarbonUpdateUtil.getTableBlockPath(TID, factPath, isPartitionTable)
           val completeBlockName = CarbonTablePath
             .addDataPartPrefix(CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.BLOCK_ID) +
                                CarbonCommonConstants.FACT_FILE_EXT)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index bdecac1..f88e767 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -28,6 +28,7 @@ import org.apache.spark.sql.execution.command.management.CarbonAlterTableCompact
 import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
@@ -113,7 +114,7 @@ object HorizontalCompaction {
       absTableIdentifier: AbsoluteTableIdentifier,
       segmentUpdateStatusManager: SegmentUpdateStatusManager,
       factTimeStamp: Long,
-      segLists: util.List[String]): Unit = {
+      segLists: util.List[Segment]): Unit = {
     val db = carbonTable.getDatabaseName
     val table = carbonTable.getTableName
     // get the valid segments qualified for update compaction.
@@ -163,7 +164,7 @@ object HorizontalCompaction {
       absTableIdentifier: AbsoluteTableIdentifier,
       segmentUpdateStatusManager: SegmentUpdateStatusManager,
       factTimeStamp: Long,
-      segLists: util.List[String]): Unit = {
+      segLists: util.List[Segment]): Unit = {
 
     val db = carbonTable.getDatabaseName
     val table = carbonTable.getTableName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
new file mode 100644
index 0000000..2aaecc7
--- /dev/null
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.partition
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatus
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+import org.apache.carbondata.processing.util.CarbonLoaderUtil
+
+/**
+ * Adding the partition to the hive and create a new segment if the location has data.
+ *
+ */
+case class CarbonAlterTableAddHivePartitionCommand(
+    tableName: TableIdentifier,
+    partitionSpecsAndLocs: Seq[(TablePartitionSpec, Option[String])],
+    ifNotExists: Boolean)
+  extends AtomicRunnableCommand {
+
+  var partitionSpecsAndLocsTobeAdded : util.List[PartitionSpec] = _
+  var table: CarbonTable = _
+
+  override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
+    table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    if (table.isHivePartitionTable) {
+      val partitionWithLoc = partitionSpecsAndLocs.filter(_._2.isDefined)
+      if (partitionWithLoc.nonEmpty) {
+        val partitionSpecs = partitionWithLoc.map{ case (part, location) =>
+          new PartitionSpec(
+            new util.ArrayList(part.map(p => p._1 + "=" + p._2).toList.asJava),
+            location.get)
+        }
+        // Get all the partitions which are not already present in hive.
+        val currParts = CarbonFilters.getCurrentPartitions(sparkSession, tableName).get
+        partitionSpecsAndLocsTobeAdded =
+          new util.ArrayList(partitionSpecs.filterNot { part =>
+          currParts.exists(p => part.equals(p))
+        }.asJava)
+      }
+      AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists).run(sparkSession)
+    }
+    Seq.empty[Row]
+  }
+
+
+  override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
+    AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
+    val msg = s"Got exception $exception when processing data of add partition." +
+              "Dropping partitions to the metadata"
+    LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
+    Seq.empty[Row]
+  }
+
+  override def processData(sparkSession: SparkSession): Seq[Row] = {
+    // Partitions with physical data should be registered to as a new segment.
+    if (partitionSpecsAndLocsTobeAdded != null && partitionSpecsAndLocsTobeAdded.size() > 0) {
+      val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
+        partitionSpecsAndLocsTobeAdded)
+      if (segmentFile != null) {
+        val loadModel = new CarbonLoadModel
+        loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
+        // Create new entry in tablestatus file
+        CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
+        val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
+        val segmentFileName =
+          loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
+        newMetaEntry.setSegmentFile(segmentFileName)
+        val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
+        CarbonUtil.checkAndCreateFolder(segmentsLoc)
+        val segmentPath = segmentsLoc + CarbonCommonConstants.FILE_SEPARATOR + segmentFileName
+        SegmentFileStore.writeSegmentFile(segmentFile, segmentPath)
+        CarbonLoaderUtil.populateNewLoadMetaEntry(
+          newMetaEntry,
+          SegmentStatus.SUCCESS,
+          loadModel.getFactTimeStamp,
+          true)
+        // Add size to the entry
+        CarbonLoaderUtil.addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId, table)
+        // Make the load as success in table status
+        CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false)
+      }
+    }
+    Seq.empty[Row]
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index cb4dece..407057e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -25,17 +25,17 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
-import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.util.AlterTableUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
+import org.apache.carbondata.spark.rdd.CarbonDropPartitionRDD
 
 /**
  * Drop the partitions from hive and carbon store. It drops the partitions in following steps
@@ -59,16 +59,43 @@ case class CarbonAlterTableDropHivePartitionCommand(
     retainData: Boolean)
   extends AtomicRunnableCommand {
 
+  var carbonPartitionsTobeDropped : util.List[PartitionSpec] = _
+  var table: CarbonTable = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
     if (CarbonUtil.hasAggregationDataMap(table)) {
       throw new AnalysisException(
         "Partition can not be dropped as it is mapped to Pre Aggregate table")
     }
     if (table.isHivePartitionTable) {
+      var locks = List.empty[ICarbonLock]
       try {
-        specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+        val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+          LockUsage.COMPACTION_LOCK,
+          LockUsage.DELETE_SEGMENT_LOCK,
+          LockUsage.DROP_TABLE_LOCK,
+          LockUsage.CLEAN_FILES_LOCK,
+          LockUsage.ALTER_PARTITION_LOCK)
+        locks = AlterTableUtil.validateTableAndAcquireLock(
+          table.getDatabaseName,
+          table.getTableName,
+          locksToBeAcquired)(sparkSession)
+        val partitions =
+          specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))
+        val carbonPartitions = partitions.map { partition =>
+          new PartitionSpec(new util.ArrayList[String](
+            partition.spec.seq.map { case (column, value) => column + "=" + value }.toList.asJava),
+            partition.location)
+        }
+        carbonPartitionsTobeDropped = new util.ArrayList[PartitionSpec](carbonPartitions.asJava)
+        // Drop the partitions from hive.
+        AlterTableDropPartitionCommand(
+          tableName,
+          specs,
+          ifExists,
+          purge,
+          retainData).run(sparkSession)
       } catch {
         case e: Exception =>
           if (!ifExists) {
@@ -77,15 +104,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
             log.warn(e.getMessage)
             return Seq.empty[Row]
           }
+      } finally {
+        AlterTableUtil.releaseLocks(locks)
       }
 
-      // Drop the partitions from hive.
-      AlterTableDropPartitionCommand(
-        tableName,
-        specs,
-        ifExists,
-        purge,
-        retainData).run(sparkSession)
     }
     Seq.empty[Row]
   }
@@ -100,7 +122,6 @@ case class CarbonAlterTableDropHivePartitionCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
     var locks = List.empty[ICarbonLock]
     val uniqueId = System.currentTimeMillis().toString
     try {
@@ -119,48 +140,27 @@ case class CarbonAlterTableDropHivePartitionCommand(
       }.toSet
       val segments = new SegmentStatusManager(table.getAbsoluteTableIdentifier)
         .getValidAndInvalidSegments.getValidSegments
-      try {
-        // First drop the partitions from partition mapper files of each segment
-        new CarbonDropPartitionRDD(sparkSession.sparkContext,
-          table.getTablePath,
-          segments.asScala,
-          partitionNames.toSeq,
-          uniqueId,
-          partialMatch = true).collect()
-      } catch {
-        case e: Exception =>
-          // roll back the drop partitions from carbon store
-          new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
-            table.getTablePath,
-            segments.asScala,
-            false,
-            uniqueId,
-            partitionNames.toSeq).collect()
-          throw e
-      }
-      // commit the drop partitions from carbon store
-      new CarbonDropPartitionCommitRDD(sparkSession.sparkContext,
+      // First drop the partitions from partition mapper files of each segment
+      val tuples = new CarbonDropPartitionRDD(sparkSession.sparkContext,
         table.getTablePath,
         segments.asScala,
-        true,
-        uniqueId,
-        partitionNames.toSeq).collect()
-      // Update the loadstatus with update time to clear cache from driver.
-      val segmentSet = new util.HashSet[String](new SegmentStatusManager(table
-        .getAbsoluteTableIdentifier).getValidAndInvalidSegments.getValidSegments)
-      CarbonUpdateUtil.updateTableMetadataStatus(
-        segmentSet,
-        table,
-        uniqueId,
-        true,
-        new util.ArrayList[String])
+        carbonPartitionsTobeDropped,
+        uniqueId).collect()
+      val tobeUpdatedSegs = new util.ArrayList[String]
+      val tobeDeletedSegs = new util.ArrayList[String]
+      tuples.foreach{case (tobeUpdated, tobeDeleted) =>
+        if (tobeUpdated.split(",").length > 0) {
+          tobeUpdatedSegs.add(tobeUpdated.split(",")(0))
+        }
+        if (tobeDeleted.split(",").length > 0) {
+          tobeDeletedSegs.add(tobeDeleted.split(",")(0))
+        }
+      }
+      SegmentFileStore.commitDropPartitions(table, uniqueId, tobeUpdatedSegs, tobeDeletedSegs)
       DataMapStoreManager.getInstance().clearDataMaps(table.getAbsoluteTableIdentifier)
     } finally {
       AlterTableUtil.releaseLocks(locks)
-      new PartitionMapFileStore().cleanSegments(
-        table,
-        new util.ArrayList(CarbonFilters.getPartitions(Seq.empty, sparkSession, tableName).asJava),
-        false)
+      SegmentFileStore.cleanSegments(table, null, false)
     }
     Seq.empty[Row]
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 7fe2658..38ac58e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -31,7 +31,7 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.CacheProvider
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.datamap.{DataMapStoreManager, Segment}
 import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
@@ -204,7 +204,7 @@ case class CarbonAlterTableDropPartitionCommand(
       val validSegments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala
       val threadArray: Array[Thread] = new Array[Thread](validSegments.size)
       var i = 0
-      for (segmentId: String <- validSegments) {
+      for (segmentId: Segment <- validSegments) {
         threadArray(i) = dropPartitionThread(sqlContext, carbonLoadModel, executor,
           segmentId, partitionId, dropWithData, oldPartitionIds)
         threadArray(i).start()
@@ -216,7 +216,7 @@ case class CarbonAlterTableDropPartitionCommand(
       val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
+      refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception when dropping partition: ${ e.getMessage }")
@@ -238,7 +238,7 @@ case class CarbonAlterTableDropPartitionCommand(
 case class dropPartitionThread(sqlContext: SQLContext,
     carbonLoadModel: CarbonLoadModel,
     executor: ExecutorService,
-    segmentId: String,
+    segmentId: Segment,
     partitionId: String,
     dropWithData: Boolean,
     oldPartitionIds: List[Int]) extends Thread {
@@ -259,7 +259,7 @@ case class dropPartitionThread(sqlContext: SQLContext,
   private def executeDroppingPartition(sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       executor: ExecutorService,
-      segmentId: String,
+      segmentId: Segment,
       partitionId: String,
       dropWithData: Boolean,
       oldPartitionIds: List[Int]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 020a72c..7aefbbe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -215,7 +215,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       var i = 0
       validSegments.foreach { segmentId =>
         threadArray(i) = SplitThread(sqlContext, carbonLoadModel, executor,
-          segmentId, partitionId, oldPartitionIdList)
+          segmentId.getSegmentNo, partitionId, oldPartitionIdList)
         threadArray(i).start()
         i += 1
       }
@@ -225,7 +225,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
       val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
-      refresher.refreshSegments(validSegments.asJava)
+      refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>
         LOGGER.error(s"Exception when split partition: ${ e.getMessage }")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index d2acb00..59c43aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.execution.command.datamap.CarbonDropDataMapCommand
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
 import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -178,7 +179,11 @@ case class CreatePreAggregateTableCommand(
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.
-    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, parentTable)
+    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false,
+      parentTable,
+      CarbonFilters.getCurrentPartitions(sparkSession,
+      TableIdentifier(parentTable.getTableName,
+        Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
     val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
     if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
index ed6be97..657e0c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateListeners.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompac
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.schema.table.{AggregationDataMapSchema, CarbonTable}
@@ -227,11 +228,17 @@ object LoadPostAggregateListener extends OperationEventListener {
             operationContext.getProperty(
               s"${parentTableDatabase}_${parentTableName}_Segment").toString)
         } else {
-            (TableIdentifier(table.getTableName, Some(table.getDatabaseName)),
-              carbonLoadModel.getSegmentId)
+            val currentSegmentFile = operationContext.getProperty("current.segmentfile")
+            val segment = if (currentSegmentFile != null) {
+              new Segment(carbonLoadModel.getSegmentId, currentSegmentFile.toString)
+            } else {
+              Segment.toSegment(carbonLoadModel.getSegmentId)
+            }
+            (TableIdentifier(table.getTableName, Some(table.getDatabaseName)), segment.toString)
         }
+
         PreAggregateUtil.startDataLoadForDataMap(
-            parentTableIdentifier,
+        parentTableIdentifier,
             segmentToLoad,
             validateSegments = false,
             childLoadCommand,