You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/27 08:19:08 UTC

[11/16] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index fc7c13a..22dab27 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -28,6 +28,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.util.CarbonUtil
@@ -98,6 +99,18 @@ case class CarbonCreateTableCommand(
           val partitionString =
             if (partitionInfo != null &&
                 partitionInfo.getPartitionType == PartitionType.NATIVE_HIVE) {
+              // Restrict dictionary encoding on partition columns.
+              // TODO Need to decide wherher it is required
+              val dictionaryOnPartitionColumn =
+              partitionInfo.getColumnSchemaList.asScala.exists{p =>
+                p.hasEncoding(Encoding.DICTIONARY) && !p.hasEncoding(Encoding.DIRECT_DICTIONARY)
+              }
+              if (dictionaryOnPartitionColumn) {
+                throwMetadataException(
+                  dbName,
+                  tableName,
+                  s"Dictionary include cannot be applied on partition columns")
+              }
               s" PARTITIONED BY (${partitionInfo.getColumnSchemaList.asScala.map(
                 _.getColumnName).mkString(",")})"
             } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index d2c691b..bff65be 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -17,7 +17,6 @@
 package org.apache.spark.sql.execution.datasources
 
 import java.io.File
-import java.text.SimpleDateFormat
 import java.util
 import java.util.concurrent.ConcurrentHashMap
 import java.util.concurrent.atomic.AtomicLong
@@ -39,16 +38,18 @@ import org.apache.spark.sql.sources.DataSourceRegister
 import org.apache.spark.sql.types._
 
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.util.{CarbonProperties, DataTypeUtil}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, DataTypeConverterImpl, DataTypeUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutputFormat}
 import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter
 import org.apache.carbondata.hadoop.internal.ObjectArrayWritable
 import org.apache.carbondata.hadoop.util.ObjectSerializationUtil
-import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util}
 
@@ -66,6 +67,10 @@ with Serializable {
     None
   }
 
+  SparkSession.getActiveSession.get.sessionState.conf.setConfString(
+    "spark.sql.sources.commitProtocolClass",
+    "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
+
   override def prepareWrite(
       sparkSession: SparkSession,
       job: Job,
@@ -77,9 +82,6 @@ with Serializable {
       classOf[CarbonOutputCommitter],
       classOf[CarbonOutputCommitter])
     conf.set("carbon.commit.protocol", "carbon.commit.protocol")
-    sparkSession.sessionState.conf.setConfString(
-      "spark.sql.sources.commitProtocolClass",
-      "org.apache.spark.sql.execution.datasources.CarbonSQLHadoopMapReduceCommitProtocol")
     job.setOutputFormatClass(classOf[CarbonTableOutputFormat])
     val table = CarbonEnv.getCarbonTable(
       TableIdentifier(options("tableName"), options.get("dbName")))(sparkSession)
@@ -114,13 +116,7 @@ with Serializable {
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)
     CarbonTableOutputFormat.setOverwrite(conf, options("overwrite").toBoolean)
     model.setPartitionLoad(true)
-    // Set the update timestamp if user sets in case of update query. It needs to be updated
-    // in load status update time
-    val updateTimeStamp = options.get("updatetimestamp")
-    if (updateTimeStamp.isDefined) {
-      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
-      model.setFactTimeStamp(updateTimeStamp.get.toLong)
-    }
+
     val staticPartition = options.getOrElse("staticpartition", null)
     if (staticPartition != null) {
       conf.set("carbon.staticpartition", staticPartition)
@@ -131,6 +127,30 @@ with Serializable {
     if (segemntsTobeDeleted.isDefined) {
       conf.set(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, segemntsTobeDeleted.get)
     }
+
+    val currPartition = options.getOrElse("currentpartition", null)
+    if (currPartition != null) {
+      conf.set("carbon.currentpartition", currPartition)
+    }
+    // Update with the current in progress load.
+    val currEntry = options.getOrElse("currentloadentry", null)
+    if (currEntry != null) {
+      val loadEntry =
+        ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
+      val details =
+        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+      model.setSegmentId(loadEntry.getLoadName)
+      model.setFactTimeStamp(loadEntry.getLoadStartTime)
+      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+      list.add(loadEntry)
+      model.setLoadMetadataDetails(list)
+    }
+    // Set the update timestamp if user sets in case of update query. It needs to be updated
+    // in load status update time
+    val updateTimeStamp = options.get("updatetimestamp")
+    if (updateTimeStamp.isDefined) {
+      conf.set(CarbonTableOutputFormat.UPADTE_TIMESTAMP, updateTimeStamp.get)
+    }
     CarbonTableOutputFormat.setLoadModel(conf, model)
 
     new OutputWriterFactory {
@@ -146,13 +166,14 @@ with Serializable {
           path: String,
           dataSchema: StructType,
           context: TaskAttemptContext): OutputWriter = {
+        val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
         val isCarbonUseMultiDir = CarbonProperties.getInstance().isUseMultiTempDir
         var storeLocation: Array[String] = Array[String]()
         val isCarbonUseLocalDir = CarbonProperties.getInstance()
           .getProperty("carbon.use.local.dir", "false").equalsIgnoreCase("true")
 
 
-        val taskNumber = generateTaskNumber(path, context)
+        val taskNumber = generateTaskNumber(path, context, model.getSegmentId)
         val tmpLocationSuffix =
           File.separator + "carbon" + System.nanoTime() + File.separator + taskNumber
         if (isCarbonUseLocalDir) {
@@ -174,7 +195,7 @@ with Serializable {
             storeLocation :+ (System.getProperty("java.io.tmpdir") + tmpLocationSuffix)
         }
         CarbonTableOutputFormat.setTempStoreLocations(context.getConfiguration, storeLocation)
-        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber)
+        new CarbonOutputWriter(path, context, dataSchema.map(_.dataType), taskNumber, model)
       }
 
       /**
@@ -182,7 +203,7 @@ with Serializable {
        * of partition tables.
        */
       private def generateTaskNumber(path: String,
-          context: TaskAttemptContext): String = {
+          context: TaskAttemptContext, segmentId: String): String = {
         var partitionNumber: java.lang.Long = taskIdMap.get(path)
         if (partitionNumber == null) {
           partitionNumber = counter.incrementAndGet()
@@ -190,8 +211,7 @@ with Serializable {
           taskIdMap.put(path, partitionNumber)
         }
         val taskID = context.getTaskAttemptID.getTaskID.getId
-        String.valueOf(Math.pow(10, 5).toInt + taskID) +
-          String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
+        CarbonScalaUtil.generateUniqueNumber(taskID, segmentId, partitionNumber)
       }
 
       override def getFileExtension(context: TaskAttemptContext): String = {
@@ -233,9 +253,12 @@ private trait AbstractCarbonOutputWriter {
 private class CarbonOutputWriter(path: String,
     context: TaskAttemptContext,
     fieldTypes: Seq[DataType],
-    taskNo : String)
+    taskNo : String,
+    model: CarbonLoadModel)
   extends OutputWriter with AbstractCarbonOutputWriter {
-  val model = CarbonTableOutputFormat.getLoadModel(context.getConfiguration)
+
+  val converter = new DataTypeConverterImpl
+
   val partitions =
     getPartitionsFromPath(path, context, model).map(ExternalCatalogUtils.unescapePathName)
   val staticPartition: util.HashMap[String, Boolean] = {
@@ -247,47 +270,52 @@ private class CarbonOutputWriter(path: String,
       null
     }
   }
-  lazy val (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
-    val updatedPartitions = partitions.map{ p =>
-      val value = p.substring(p.indexOf("=") + 1, p.length)
-      val col = p.substring(0, p.indexOf("="))
-      // NUll handling case. For null hive creates with this special name
-      if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
-        (col, null)
-        // we should replace back the special string with empty value.
-      } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
-        (col, "")
-      } else {
-        (col, value)
-      }
-    }
-
-    if (staticPartition != null) {
-      val loadModel = recordWriter.getLoadModel
-      val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-      var timeStampformatString = loadModel.getTimestampformat
-      if (timeStampformatString.isEmpty) {
-        timeStampformatString = loadModel.getDefaultTimestampFormat
-      }
-      val timeFormat = new SimpleDateFormat(timeStampformatString)
-      var dateFormatString = loadModel.getDateFormat
-      if (dateFormatString.isEmpty) {
-        dateFormatString = loadModel.getDefaultDateFormat
-      }
-      val dateFormat = new SimpleDateFormat(dateFormatString)
-      val formattedPartitions = updatedPartitions.map {case (col, value) =>
-        // Only convert the static partitions to the carbon format and use it while loading data
-        // to carbon.
-        (col, value)
-      }
-      (formattedPartitions, updatePartitions(formattedPartitions.map(_._2)))
+  lazy val currPartitions: util.List[indexstore.PartitionSpec] = {
+    val currParts = context.getConfiguration.get("carbon.currentpartition")
+    if (currParts != null) {
+      ObjectSerializationUtil.convertStringToObject(
+        currParts).asInstanceOf[util.List[indexstore.PartitionSpec]]
     } else {
-      (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
+      new util.ArrayList[indexstore.PartitionSpec]()
     }
+  }
+  var (updatedPartitions, partitionData) = if (partitions.nonEmpty) {
+    val updatedPartitions = partitions.map(splitPartition)
+    (updatedPartitions, updatePartitions(updatedPartitions.map(_._2)))
   } else {
     (Map.empty[String, String].toArray, Array.empty)
   }
 
+  private def splitPartition(p: String) = {
+    val value = p.substring(p.indexOf("=") + 1, p.length)
+    val col = p.substring(0, p.indexOf("="))
+    // NUll handling case. For null hive creates with this special name
+    if (value.equals("__HIVE_DEFAULT_PARTITION__")) {
+      (col, null)
+      // we should replace back the special string with empty value.
+    } else if (value.equals(CarbonCommonConstants.MEMBER_DEFAULT_VAL)) {
+      (col, "")
+    } else {
+      (col, value)
+    }
+  }
+
+  lazy val writePath = {
+    val updatedPath = getPartitionPath(path, context, model)
+    // in case of partition location specified by user then search the partitions from the current
+    // partitions to get the corresponding partitions.
+    if (partitions.isEmpty) {
+      val writeSpec = new indexstore.PartitionSpec(null, updatedPath)
+      val index = currPartitions.indexOf(writeSpec)
+      if (index > -1) {
+        val spec = currPartitions.get(index)
+        updatedPartitions = spec.getPartitions.asScala.map(splitPartition).toArray
+        partitionData = updatePartitions(updatedPartitions.map(_._2))
+      }
+    }
+    updatedPath
+  }
+
   val writable = new ObjectArrayWritable
 
   private def updatePartitions(partitionData: Seq[String]): Array[AnyRef] = {
@@ -302,21 +330,30 @@ private class CarbonOutputWriter(path: String,
       } else {
         col.getDataType
       }
-      if (staticPartition != null) {
-        DataTypeUtil.getDataBasedOnDataType(
+      if (staticPartition != null && staticPartition.get(col.getColumnName.toLowerCase)) {
+        val converetedVal =
           CarbonScalaUtil.convertStaticPartitions(
             partitionData(index),
             col,
-            model.getCarbonDataLoadSchema.getCarbonTable),
-          dataType)
+            model.getCarbonDataLoadSchema.getCarbonTable)
+        if (col.hasEncoding(Encoding.DICTIONARY)) {
+          converetedVal.toInt.asInstanceOf[AnyRef]
+        } else {
+          DataTypeUtil.getDataBasedOnDataType(
+            converetedVal,
+            dataType,
+            converter)
+        }
       } else {
-        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType)
+        DataTypeUtil.getDataBasedOnDataType(partitionData(index), dataType, converter)
       }
     }.toArray
   }
 
   private val recordWriter: CarbonRecordWriter = {
     context.getConfiguration.set("carbon.outputformat.taskno", taskNo)
+    context.getConfiguration.set("carbon.outputformat.writepath",
+      writePath + "/" + model.getSegmentId + "_" + model.getFactTimeStamp + ".tmp")
     new CarbonTableOutputFormat() {
       override def getDefaultWorkFile(context: TaskAttemptContext, extension: String): Path = {
         new Path(path)
@@ -355,51 +392,54 @@ private class CarbonOutputWriter(path: String,
 
   override def close(): Unit = {
     recordWriter.close(context)
-    val loadModel = recordWriter.getLoadModel
-    val segmentPath = CarbonTablePath.getSegmentPath(loadModel.getTablePath, loadModel.getSegmentId)
-    val table = loadModel.getCarbonDataLoadSchema.getCarbonTable
-    var timeStampformatString = loadModel.getTimestampformat
-    if (timeStampformatString.isEmpty) {
-      timeStampformatString = loadModel.getDefaultTimestampFormat
-    }
-    val timeFormat = new SimpleDateFormat(timeStampformatString)
-    var dateFormatString = loadModel.getDateFormat
-    if (dateFormatString.isEmpty) {
-      dateFormatString = loadModel.getDefaultDateFormat
-    }
-    val dateFormat = new SimpleDateFormat(dateFormatString)
-    val serializeFormat =
-      loadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
-    val badRecordAction = loadModel.getBadRecordsAction.split(",")(1)
-    val isEmptyBadRecord = loadModel.getIsEmptyDataBadRecord.split(",")(1).toBoolean
     // write partition info to new file.
     val partitonList = new util.ArrayList[String]()
     val formattedPartitions =
     // All dynamic partitions need to be converted to proper format
       CarbonScalaUtil.updatePartitions(
         updatedPartitions.toMap,
-        table,
-        timeFormat,
-        dateFormat)
+        model.getCarbonDataLoadSchema.getCarbonTable)
     formattedPartitions.foreach(p => partitonList.add(p._1 + "=" + p._2))
-    new PartitionMapFileStore().writePartitionMapFile(
-      segmentPath,
-      loadModel.getTaskNo,
+    SegmentFileStore.writeSegmentFile(
+      model.getTablePath,
+      taskNo,
+      writePath,
+      model.getSegmentId + "_" + model.getFactTimeStamp + "",
       partitonList)
   }
 
+  def getPartitionPath(path: String,
+      attemptContext: TaskAttemptContext,
+      model: CarbonLoadModel): String = {
+    if (updatedPartitions.nonEmpty) {
+      val formattedPartitions =
+      // All dynamic partitions need to be converted to proper format
+        CarbonScalaUtil.updatePartitions(
+          updatedPartitions.toMap,
+          model.getCarbonDataLoadSchema.getCarbonTable)
+      val partitionstr = formattedPartitions.map{p =>
+        ExternalCatalogUtils.escapePathName(p._1) + "=" + ExternalCatalogUtils.escapePathName(p._2)
+      }.mkString(CarbonCommonConstants.FILE_SEPARATOR)
+      model.getCarbonDataLoadSchema.getCarbonTable.getTablePath +
+        CarbonCommonConstants.FILE_SEPARATOR + partitionstr
+    } else {
+      var updatedPath = FileFactory.getUpdatedFilePath(path)
+      updatedPath.substring(0, updatedPath.lastIndexOf("/"))
+    }
+  }
+
   def getPartitionsFromPath(
       path: String,
       attemptContext: TaskAttemptContext,
       model: CarbonLoadModel): Array[String] = {
     var attemptId = attemptContext.getTaskAttemptID.toString + "/"
-    if (path.indexOf(attemptId) <= 0) {
-
-      attemptId = model.getTableName + "/"
-    }
-    val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
-    if (str.length > 0) {
-      str.split("/")
+    if (path.indexOf(attemptId) > -1) {
+      val str = path.substring(path.indexOf(attemptId) + attemptId.length, path.lastIndexOf("/"))
+      if (str.length > 0) {
+        str.split("/")
+      } else {
+        Array.empty
+      }
     } else {
       Array.empty
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 544c494..48679b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager
@@ -143,10 +144,10 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       projects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
       scanBuilder: (Seq[Attribute], Array[Filter],
-        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = {
     val names = relation.catalogTable.get.partitionColumnNames
     // Get the current partitions from table.
-    var partitions: Seq[String] = null
+    var partitions: Seq[PartitionSpec] = null
     if (names.nonEmpty) {
       val partitionSet = AttributeSet(names
         .map(p => relation.output.find(_.name.equalsIgnoreCase(p)).get))
@@ -167,7 +168,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
         CarbonFilters.getPartitions(
           updatedPartitionFilters.toSeq,
           SparkSession.getActiveSession.get,
-          relation.catalogTable.get.identifier)
+          relation.catalogTable.get.identifier).orNull
     }
     pruneFilterProjectRaw(
       relation,
@@ -199,9 +200,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
       relation: LogicalRelation,
       rawProjects: Seq[NamedExpression],
       filterPredicates: Seq[Expression],
-      partitions: Seq[String],
+      partitions: Seq[PartitionSpec],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow]) = {
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow]) = {
     val projects = rawProjects.map {p =>
       p.transform {
         case CustomDeterministicExpression(exp) => exp
@@ -350,9 +351,9 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
 
   private def getDataSourceScan(relation: LogicalRelation,
       output: Seq[Attribute],
-      partitions: Seq[String],
+      partitions: Seq[PartitionSpec],
       scanBuilder: (Seq[Attribute], Seq[Expression], Seq[Filter],
-        ArrayBuffer[AttributeReference], Seq[String]) => RDD[InternalRow],
+        ArrayBuffer[AttributeReference], Seq[PartitionSpec]) => RDD[InternalRow],
       candidatePredicates: Seq[Expression],
       pushedFilters: Seq[Filter],
       metadata: Map[String, String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 0178716..f69ccc1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.{CarbonAlterTableCompactionCommand, CarbonInsertIntoCommand, CarbonLoadDataCommand, RefreshCarbonTableCommand}
-import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
+import org.apache.spark.sql.execution.command.partition.{CarbonAlterTableAddHivePartitionCommand, CarbonAlterTableDropHivePartitionCommand, CarbonShowCarbonPartitionsCommand}
 import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand, CarbonSetCommand}
@@ -247,16 +247,19 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         } else {
           ExecutedCommandExec(rename) :: Nil
         }
-      case addPartition@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, _) =>
+      case addPart@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, ifNotExists) =>
         val dbOption = tableName.database.map(_.toLowerCase)
         val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
         val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .tableExists(tableIdentifier)(sparkSession)
-        if (isCarbonTable && partitionSpecsAndLocs.exists(_._2.isDefined)) {
-          throw new UnsupportedOperationException(
-            "add partition with location is not supported")
+        if (isCarbonTable) {
+          ExecutedCommandExec(
+            CarbonAlterTableAddHivePartitionCommand(
+              tableName,
+              partitionSpecsAndLocs,
+              ifNotExists)) :: Nil
         } else {
-          ExecutedCommandExec(addPartition) :: Nil
+          ExecutedCommandExec(addPart) :: Nil
         }
       case RefreshTable(tableIdentifier) =>
         RefreshCarbonTableCommand(tableIdentifier.database,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index b8608f4..7bf8536 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -29,11 +29,14 @@ import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Stati
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CarbonException
 
+import org.apache.carbondata.core.datamap.Segment
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, CarbonDimension}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 
 /**
@@ -209,9 +212,10 @@ case class CarbonRelation(
         .getValidAndInvalidSegments.getValidSegments.isEmpty) {
         sizeInBytesLocalValue = 0L
       } else {
-        val tablePath = CarbonStorePath.getCarbonTablePath(
+        val carbonTablePath = CarbonStorePath.getCarbonTablePath(
           carbonTable.getTablePath,
-          carbonTable.getCarbonTableIdentifier).getPath
+          carbonTable.getCarbonTableIdentifier)
+        val tablePath = carbonTablePath.getPath
         val fileType = FileFactory.getFileType(tablePath)
         if (FileFactory.isFileExist(tablePath, fileType)) {
           // get the valid segments
@@ -220,8 +224,14 @@ case class CarbonRelation(
           var size = 0L
           // for each segment calculate the size
           segments.foreach {validSeg =>
-            size = size + FileFactory.getDirectorySize(
-              CarbonTablePath.getSegmentPath(tablePath, validSeg))
+            if (validSeg.getSegmentFileName != null) {
+              val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName)
+              size = size + CarbonUtil.getSizeOfSegment(
+                carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
+            } else {
+              size = size + FileFactory.getDirectorySize(
+                CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))
+            }
           }
           // update the new table status time
           tableStatusLastUpdateTime = tableStatusNewLastUpdatedTime

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index c7767ce..e5eb53c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.optimizer
 
+import java.util
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql._
@@ -30,14 +32,15 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
 import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
-import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -432,15 +435,11 @@ object CarbonFilters {
   }
 
   def getCurrentPartitions(sparkSession: SparkSession,
-      carbonTable: CarbonTable): Seq[String] = {
-    if (carbonTable.isHivePartitionTable) {
-      CarbonFilters.getPartitions(
-        Seq.empty,
-        sparkSession,
-        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
-    } else {
-      Seq.empty
-    }
+      tableIdentifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
+    CarbonFilters.getPartitions(
+      Seq.empty,
+      sparkSession,
+      tableIdentifier)
   }
 
   /**
@@ -452,11 +451,15 @@ object CarbonFilters {
    */
   def getPartitions(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
-      identifier: TableIdentifier): Seq[String] = {
+      identifier: TableIdentifier): Option[Seq[PartitionSpec]] = {
+    val table = CarbonEnv.getCarbonTable(identifier)(sparkSession)
     // first try to read partitions in case if the trigger comes from the aggregation table load.
-    val partitionsForAggTable = getPartitionsForAggTable(sparkSession, identifier)
+    val partitionsForAggTable = getPartitionsForAggTable(sparkSession, table)
     if (partitionsForAggTable.isDefined) {
-      return partitionsForAggTable.get
+      return partitionsForAggTable
+    }
+    if (!table.isHivePartitionTable) {
+      return None
     }
     val partitions = {
       try {
@@ -483,35 +486,33 @@ object CarbonFilters {
             identifier)
       }
     }
-    partitions.toList.flatMap { partition =>
-      partition.spec.seq.map{case (column, value) => column + "=" + value}
-    }.toSet.toSeq
+    Some(partitions.map { partition =>
+      new PartitionSpec(
+        new util.ArrayList[String]( partition.spec.seq.map{case (column, value) =>
+          column + "=" + value}.toList.asJava), partition.location)
+    })
   }
 
   /**
    * In case of loading aggregate tables it needs to be get only from the main table load in
-   * progress segment. So we should read from the partition map file of that segment.
+   * progress segment. So we should read from the segment file of that segment
    */
   def getPartitionsForAggTable(sparkSession: SparkSession,
-      identifier: TableIdentifier): Option[Seq[String]] = {
+      table: CarbonTable): Option[Seq[PartitionSpec]] = {
     // when validate segments is disabled then only read from partitionmap
     val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
     if (carbonSessionInfo != null) {
       val validateSegments = carbonSessionInfo.getSessionParams
         .getProperty(CarbonCommonConstants.VALIDATE_CARBON_INPUT_SEGMENTS +
-                     CarbonEnv.getDatabaseName(identifier.database)(sparkSession) + "." +
-                     identifier.table, "true").toBoolean
+                     table.getDatabaseName + "." +
+                     table.getTableName, "true").toBoolean
       if (!validateSegments) {
         val segmentNumbersFromProperty = CarbonProperties.getInstance
           .getProperty(CarbonCommonConstants.CARBON_INPUT_SEGMENTS +
-                       CarbonEnv.getDatabaseName(identifier.database)(sparkSession)
-                       + "." + identifier.table)
-        val carbonTable = CarbonEnv.getCarbonTable(identifier)(sparkSession)
-        val segmentPath =
-          CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentNumbersFromProperty)
-        val partitionMapper = new PartitionMapFileStore()
-        partitionMapper.readAllPartitionsOfSegment(segmentPath)
-        Some(partitionMapper.getPartitionMap.asScala.map(_._2).flatMap(_.asScala).toSet.toSeq)
+                       table.getDatabaseName + "." + table.getTableName)
+        val segment = Segment.toSegment(segmentNumbersFromProperty)
+        val segmentFile = new SegmentFileStore(table.getTablePath, segment.getSegmentFileName)
+        Some(segmentFile.getPartitionSpecs.asScala)
       } else {
         None
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 0b62e10..d45020b 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -38,7 +38,7 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Experime
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
 /**
@@ -146,14 +146,8 @@ class CarbonSessionCatalog(
       ignoreIfExists: Boolean): Unit = {
     try {
       val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      // Get the properties from thread local
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      if (carbonSessionInfo != null) {
-        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
-        super.createPartitions(tableName, updatedParts, ignoreIfExists)
-      } else {
-        super.createPartitions(tableName, parts, ignoreIfExists)
-      }
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
     } catch {
       case e: Exception =>
         super.createPartitions(tableName, parts, ignoreIfExists)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index baadd04..b9425d6 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -46,7 +46,7 @@ import org.apache.spark.util.CarbonReflectionUtils
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.{CarbonProperties, ThreadLocalSessionInfo}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -144,14 +144,8 @@ class CarbonSessionCatalog(
       ignoreIfExists: Boolean): Unit = {
     try {
       val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
-      // Get the properties from thread local
-      val carbonSessionInfo = ThreadLocalSessionInfo.getCarbonSessionInfo
-      if (carbonSessionInfo != null) {
-        val updatedParts = CarbonScalaUtil.updatePartitions(carbonSessionInfo, parts, table)
-        super.createPartitions(tableName, updatedParts, ignoreIfExists)
-      } else {
-        super.createPartitions(tableName, parts, ignoreIfExists)
-      }
+      val updatedParts = CarbonScalaUtil.updatePartitions(parts, table)
+      super.createPartitions(tableName, updatedParts, ignoreIfExists)
     } catch {
       case e: Exception =>
         super.createPartitions(tableName, parts, ignoreIfExists)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 4b0113c..d739f8c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datamap.DataMapMeta;
 import org.apache.carbondata.core.datamap.DataMapStoreManager;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.TableDataMap;
 import org.apache.carbondata.core.datamap.dev.DataMapFactory;
 import org.apache.carbondata.core.datamap.dev.DataMapWriter;
@@ -71,7 +72,7 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<DataMapWriter> writers = registry.get(columns);
-    DataMapWriter writer = factory.createWriter(segmentId);
+    DataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7b1ab9d..b7270b9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -109,6 +109,11 @@ public class CarbonDataLoadConfiguration {
    */
   private short writingCoresCount;
 
+  /**
+   * Flder path to where data should be written for this load.
+   */
+  private String dataWritePath;
+
   public CarbonDataLoadConfiguration() {
   }
 
@@ -363,4 +368,12 @@ public class CarbonDataLoadConfiguration {
   public void setWritingCoresCount(short writingCoresCount) {
     this.writingCoresCount = writingCoresCount;
   }
+
+  public String getDataWritePath() {
+    return dataWritePath;
+  }
+
+  public void setDataWritePath(String dataWritePath) {
+    this.dataWritePath = dataWritePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index ba24d41..f5b29e7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -57,7 +57,8 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
     SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
-    if (!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
+    if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT))
+        && !loadModel.isPartitionLoad()) {
       return buildInternalForNoSort(inputIterators, configuration);
     } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
@@ -251,6 +252,7 @@ public final class DataLoadProcessBuilder {
     configuration.setPreFetch(loadModel.isPreFetch());
     configuration.setNumberOfSortColumns(carbonTable.getNumberOfSortColumns());
     configuration.setNumberOfNoDictSortColumns(carbonTable.getNumberOfNoDictSortColumns());
+    configuration.setDataWritePath(loadModel.getDataWritePath());
     // For partition loading always use single core as it already runs in multiple
     // threads per partition
     if (carbonTable.isHivePartitionTable()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
index 85eb19b..6664a2c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/MeasureFieldConverterImpl.java
@@ -89,7 +89,7 @@ public class MeasureFieldConverterImpl implements FieldConverter {
     } else {
       try {
         if (dataField.isUseActualData()) {
-          output = DataTypeUtil.getConvertedMeasureValueBasedOnDataType(value, dataType, measure);
+          output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure, true);
         } else {
           output = DataTypeUtil.getMeasureValueBasedOnDataType(value, dataType, measure);
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 4c536ea..a17178a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -193,6 +193,11 @@ public class CarbonLoadModel implements Serializable {
    */
   private boolean isPartitionLoad;
 
+  /**
+   * Flder path to where data should be written for this load.
+   */
+  private String dataWritePath;
+
   public boolean isAggLoadRequest() {
     return isAggLoadRequest;
   }
@@ -870,4 +875,12 @@ public class CarbonLoadModel implements Serializable {
   public void setPartitionLoad(boolean partitionLoad) {
     isPartitionLoad = partitionLoad;
   }
+
+  public String getDataWritePath() {
+    return dataWritePath;
+  }
+
+  public void setDataWritePath(String dataWritePath) {
+    this.dataWritePath = dataWritePath;
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index be27866..3f1430d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -26,6 +26,7 @@ import java.util.*;
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
@@ -42,6 +43,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
@@ -280,7 +282,7 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
       String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
-      CompactionType compactionType) throws IOException {
+      CompactionType compactionType, String segmentFile) throws IOException {
     boolean tableStatusUpdationStatus = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -325,6 +327,7 @@ public final class CarbonDataMergerUtil {
         loadMetadataDetails.setLoadEndTime(loadEnddate);
         CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
         loadMetadataDetails.setLoadName(mergedLoadNumber);
+        loadMetadataDetails.setSegmentFile(segmentFile);
         CarbonLoaderUtil
             .addDataIndexSizeIntoMetaEntry(loadMetadataDetails, mergedLoadNumber, carbonTable);
         loadMetadataDetails.setLoadStartTime(carbonLoadModel.getFactTimeStamp());
@@ -385,7 +388,7 @@ public final class CarbonDataMergerUtil {
    */
   public static List<LoadMetadataDetails> identifySegmentsToBeMerged(
           CarbonLoadModel carbonLoadModel, long compactionSize,
-          List<LoadMetadataDetails> segments, CompactionType compactionType) {
+          List<LoadMetadataDetails> segments, CompactionType compactionType) throws IOException {
     String tablePath = carbonLoadModel.getTablePath();
     Map<String, String> tableLevelProperties = carbonLoadModel.getCarbonDataLoadSchema()
             .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
@@ -590,13 +593,13 @@ public final class CarbonDataMergerUtil {
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
       long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, String tablePath) {
+      CarbonLoadModel carbonLoadModel, String tablePath) throws IOException {
 
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    CarbonTableIdentifier tableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
+    CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
+    CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier();
 
 
     // total length
@@ -612,8 +615,15 @@ public final class CarbonDataMergerUtil {
 
       String segId = segment.getLoadName();
       // variable to store one  segment size across partition.
-      long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(tablePath, tableIdentifier, segId);
+      long sizeOfOneSegmentAcrossPartition;
+      if (segment.getSegmentFile() != null) {
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier());
+        sizeOfOneSegmentAcrossPartition = CarbonUtil
+            .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
+      } else {
+        sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId);
+      }
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
       if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
@@ -863,18 +873,18 @@ public final class CarbonDataMergerUtil {
    * @param loadMetadataDetails
    * @return
    */
-  public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
-    StringBuilder builder = new StringBuilder();
+  public static List<Segment> getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
+    List<Segment> segments = new ArrayList<>();
     for (LoadMetadataDetails segment : loadMetadataDetails) {
       //check if this load is an already merged load.
       if (null != segment.getMergedLoadName()) {
-        builder.append(segment.getMergedLoadName()).append(",");
+
+        segments.add(Segment.toSegment(segment.getMergedLoadName()));
       } else {
-        builder.append(segment.getLoadName()).append(",");
+        segments.add(Segment.toSegment(segment.getLoadName()));
       }
     }
-    builder.deleteCharAt(builder.length() - 1);
-    return builder.toString();
+    return segments;
   }
 
   /**
@@ -883,7 +893,7 @@ public final class CarbonDataMergerUtil {
    * @param absoluteTableIdentifier
    * @return
    */
-  public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
+  public static List<Segment> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
           throws IOException {
 
     SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
@@ -933,7 +943,8 @@ public final class CarbonDataMergerUtil {
     int numberUpdateDeltaFilesThreshold =
         CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
     for (LoadMetadataDetails seg : segments) {
-      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
+      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(
+          new Segment(seg.getLoadName(), seg.getSegmentFile()),
           absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
           numberUpdateDeltaFilesThreshold)) {
         validSegments.add(seg);
@@ -950,12 +961,12 @@ public final class CarbonDataMergerUtil {
 
   /**
    * method gets the segments list which get qualified for IUD compaction.
-   * @param Segments
+   * @param segments
    * @param absoluteTableIdentifier
    * @param compactionTypeIUD
    * @return
    */
-  public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
+  public static List<String> getSegListIUDCompactionQualified(List<Segment> segments,
       AbsoluteTableIdentifier absoluteTableIdentifier,
       SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
 
@@ -964,8 +975,8 @@ public final class CarbonDataMergerUtil {
     if (CompactionType.IUD_DELETE_DELTA == compactionTypeIUD) {
       int numberDeleteDeltaFilesThreshold =
           CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      List<String> deleteSegments = new ArrayList<>();
-      for (String seg : Segments) {
+      List<Segment> deleteSegments = new ArrayList<>();
+      for (Segment seg : segments) {
         if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
             numberDeleteDeltaFilesThreshold)) {
           deleteSegments.add(seg);
@@ -975,7 +986,7 @@ public final class CarbonDataMergerUtil {
         // This Code Block Append the Segname along with the Blocks selected for Merge instead of
         // only taking the segment name. This will help to parallelize better for each block
         // in case of Delete Horizontal Compaction.
-        for (String segName : deleteSegments) {
+        for (Segment segName : deleteSegments) {
           List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
               numberDeleteDeltaFilesThreshold);
           validSegments.addAll(tempSegments);
@@ -984,10 +995,10 @@ public final class CarbonDataMergerUtil {
     } else if (CompactionType.IUD_UPDDEL_DELTA == compactionTypeIUD) {
       int numberUpdateDeltaFilesThreshold =
           CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-      for (String seg : Segments) {
+      for (Segment seg : segments) {
         if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
             numberUpdateDeltaFilesThreshold)) {
-          validSegments.add(seg);
+          validSegments.add(seg.getSegmentNo());
         }
       }
     }
@@ -1027,7 +1038,7 @@ public final class CarbonDataMergerUtil {
    * @param numberDeltaFilesThreshold
    * @return
    */
-  public static Boolean checkUpdateDeltaFilesInSeg(String seg,
+  public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
       AbsoluteTableIdentifier absoluteTableIdentifier,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
@@ -1036,14 +1047,14 @@ public final class CarbonDataMergerUtil {
 
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     CarbonFile[] allSegmentFiles = segDir.listFiles();
 
     updateDeltaFiles = segmentUpdateStatusManager
-        .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
-            false, allSegmentFiles);
+        .getUpdateDeltaFilesForSegment(seg.getSegmentNo(), true,
+            CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, false, allSegmentFiles);
 
     if (updateDeltaFiles == null) {
       return false;
@@ -1079,11 +1090,12 @@ public final class CarbonDataMergerUtil {
    * @param numberDeltaFilesThreshold
    * @return
    */
-  private static boolean checkDeleteDeltaFilesInSeg(String seg,
+  private static boolean checkDeleteDeltaFilesInSeg(Segment seg,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     Set<String> uniqueBlocks = new HashSet<String>();
-    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+    List<String> blockNameList =
+        segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
 
     for (final String blockName : blockNameList) {
 
@@ -1121,11 +1133,12 @@ public final class CarbonDataMergerUtil {
    * @return
    */
 
-  private static List<String> getDeleteDeltaFilesInSeg(String seg,
+  private static List<String> getDeleteDeltaFilesInSeg(Segment seg,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 
     List<String> blockLists = new ArrayList<>();
-    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+    List<String> blockNameList =
+        segmentUpdateStatusManager.getBlockNameFromSegment(seg.getSegmentNo());
 
     for (final String blockName : blockNameList) {
 
@@ -1133,7 +1146,7 @@ public final class CarbonDataMergerUtil {
           segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
 
       if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
-        blockLists.add(seg + "/" + blockName);
+        blockLists.add(seg.getSegmentNo() + "/" + blockName);
       }
     }
     return blockLists;
@@ -1177,7 +1190,7 @@ public final class CarbonDataMergerUtil {
     segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
 
     CarbonFile[] deleteDeltaFiles =
-        segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+        segmentUpdateStatusManager.getDeleteDeltaFilesList(new Segment(seg, null), blockName);
 
     String destFileName =
         blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 2480a39..2fbdf4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -26,7 +26,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -35,7 +36,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
@@ -129,19 +129,20 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private SortIntermediateFileMerger intermediateFileMerger;
 
-  private List<String> partitionNames;
+  private PartitionSpec partitionSpec;
+
   private SortParameters sortParameters;
 
   public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
       SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
-      List<String> partitionNames) {
+      PartitionSpec partitionSpec) {
     this.carbonLoadModel = carbonLoadModel;
     this.carbonTable = carbonTable;
     this.segmentProperties = segmentProperties;
     this.segmentId = carbonLoadModel.getSegmentId();
     this.compactionType = compactionType;
     this.tableName = tableName;
-    this.partitionNames = partitionNames;
+    this.partitionSpec = partitionSpec;
   }
 
   /**
@@ -168,14 +169,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     } catch (Exception e) {
       LOGGER.error(e, "Compaction failed: " + e.getMessage());
     } finally {
-      if (partitionNames != null) {
+      if (partitionSpec != null) {
         try {
-          new PartitionMapFileStore().writePartitionMapFile(
-              CarbonTablePath.getSegmentPath(
-                  carbonLoadModel.getTablePath(),
-                  carbonLoadModel.getSegmentId()),
-              carbonLoadModel.getTaskNo(),
-              partitionNames);
+          SegmentFileStore
+              .writeSegmentFile(carbonLoadModel.getTablePath(), carbonLoadModel.getTaskNo(),
+                  partitionSpec.getLocation().toString(), carbonLoadModel.getFactTimeStamp() + "",
+                  partitionSpec.getPartitions());
         } catch (IOException e) {
           LOGGER.error(e, "Compaction failed: " + e.getMessage());
           isCompactionSuccess = false;
@@ -401,9 +400,19 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    * initialise carbon data writer instance
    */
   private void initDataHandler() throws Exception {
+    String carbonStoreLocation;
+    if (partitionSpec != null) {
+      carbonStoreLocation =
+          partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR
+              + carbonLoadModel.getFactTimeStamp() + ".tmp";
+    } else {
+      carbonStoreLocation = CarbonDataProcessorUtil
+          .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
+              tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId());
+    }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
-            tempStoreLocation);
+            tempStoreLocation, carbonStoreLocation);
     setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
         carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index 3d0700b..b41829f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -24,18 +24,19 @@ import java.util.PriorityQueue;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.PartitionMapFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.exception.SliceMergerException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -51,7 +52,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
   private CarbonFactHandler dataHandler;
   private SegmentProperties segprop;
   private CarbonLoadModel loadModel;
-  private List<String> partitionNames;
+  private PartitionSpec partitionSpec;
   /**
    * record holder heap
    */
@@ -62,16 +63,26 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
   public RowResultMergerProcessor(String databaseName,
       String tableName, SegmentProperties segProp, String[] tempStoreLocation,
-      CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) {
+      CarbonLoadModel loadModel, CompactionType compactionType, PartitionSpec partitionSpec) {
     this.segprop = segProp;
-    this.partitionNames = partitionNames;
+    this.partitionSpec = partitionSpec;
     this.loadModel = loadModel;
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
 
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+    String carbonStoreLocation;
+    if (partitionSpec != null) {
+      carbonStoreLocation =
+          partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
+              .getFactTimeStamp() + ".tmp";
+    } else {
+      carbonStoreLocation = CarbonDataProcessorUtil
+          .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
+              tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+    }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
-            tempStoreLocation);
+            tempStoreLocation, carbonStoreLocation);
     setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
         carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
@@ -157,14 +168,13 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
         if (isDataPresent) {
           this.dataHandler.closeHandler();
         }
-        if (partitionNames != null) {
-          new PartitionMapFileStore().writePartitionMapFile(
-              CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()),
-              loadModel.getTaskNo(),
-              partitionNames);
+        if (partitionSpec != null) {
+          SegmentFileStore.writeSegmentFile(loadModel.getTablePath(), loadModel.getTaskNo(),
+              partitionSpec.getLocation().toString(), loadModel.getFactTimeStamp() + "",
+              partitionSpec.getPartitions());
         }
       } catch (CarbonDataWriterException | IOException e) {
-        LOGGER.error(e,"Exception in compaction merger");
+        LOGGER.error(e, "Exception in compaction merger");
         mergeStatus = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index 68a212e..ff6ca93 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -47,9 +47,12 @@ public class RowResultProcessor {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
     String tableName = carbonTable.getTableName();
+    String carbonStoreLocation = CarbonDataProcessorUtil
+        .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
+            tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
-            segProp, tableName, tempStoreLocation);
+            segProp, tableName, tempStoreLocation, carbonStoreLocation);
     CarbonDataFileAttributes carbonDataFileAttributes =
         new CarbonDataFileAttributes(Long.parseLong(loadModel.getTaskNo()),
             loadModel.getFactTimeStamp());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d15152c..d77fcab 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -275,7 +275,7 @@ public class CarbonFactDataHandlerModel {
    */
   public static CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel,
       CarbonTable carbonTable, SegmentProperties segmentProperties, String tableName,
-      String[] tempStoreLocation) {
+      String[] tempStoreLocation, String carbonDataDirectoryPath) {
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
     carbonFactDataHandlerModel.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     carbonFactDataHandlerModel.setDatabaseName(loadModel.getDatabaseName());
@@ -307,9 +307,7 @@ public class CarbonFactDataHandlerModel {
       measureDataTypes[i++] = msr.getDataType();
     }
     carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes);
-    String carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-            tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
     List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName);
     boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
@@ -334,6 +332,10 @@ public class CarbonFactDataHandlerModel {
    * @return data directory path
    */
   private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) {
+    if (configuration.getDataWritePath() != null) {
+      CarbonUtil.checkAndCreateFolder(configuration.getDataWritePath());
+      return configuration.getDataWritePath();
+    }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String carbonDataDirectoryPath = carbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 376a546..1e648e1 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -302,7 +302,7 @@ public final class CarbonDataProcessorUtil {
   }
 
   public static boolean isHeaderValid(String tableName, String[] csvHeader,
-      CarbonDataLoadSchema schema) {
+      CarbonDataLoadSchema schema, List<String> ignoreColumns) {
     Iterator<String> columnIterator =
         CarbonDataProcessorUtil.getSchemaColumnNames(schema, tableName).iterator();
     Set<String> csvColumns = new HashSet<String>(csvHeader.length);
@@ -311,7 +311,8 @@ public final class CarbonDataProcessorUtil {
     // file header should contain all columns of carbon table.
     // So csvColumns should contain all elements of columnIterator.
     while (columnIterator.hasNext()) {
-      if (!csvColumns.contains(columnIterator.next().toLowerCase())) {
+      String column = columnIterator.next().toLowerCase();
+      if (!csvColumns.contains(column) && !ignoreColumns.contains(column)) {
         return false;
       }
     }
@@ -377,7 +378,7 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String checkAndCreateCarbonStoreLocation(String factStoreLocation,
+  public static String createCarbonStoreLocation(String factStoreLocation,
       String databaseName, String tableName, String partitionId, String segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
@@ -385,7 +386,6 @@ public final class CarbonDataProcessorUtil {
         CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
     String carbonDataDirectoryPath =
         carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
     return carbonDataDirectoryPath;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 00f13a5..32c72da 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.Distributable;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
@@ -166,6 +167,23 @@ public final class CarbonLoaderUtil {
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
       throws IOException {
+    return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid,
+        new ArrayList<Segment>(), new ArrayList<Segment>());
+  }
+
+  /**
+   * This API will write the load level metadata for the loadmanagement module inorder to
+   * manage the load and query execution management smoothly.
+   *
+   * @param newMetaEntry
+   * @param loadModel
+   * @param uuid
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
+      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid,
+      List<Segment> segmentsToBeDeleted, List<Segment> segmentFilesTobeUpdated) throws IOException {
     boolean status = false;
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
@@ -237,9 +255,11 @@ public final class CarbonLoaderUtil {
           // existing entry needs to be overwritten as the entry will exist with some
           // intermediate status
           int indexToOverwriteNewMetaEntry = 0;
+          boolean found = false;
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getLoadName().equals(newMetaEntry.getLoadName())
                 && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+              found = true;
               break;
             }
             indexToOverwriteNewMetaEntry++;
@@ -254,6 +274,10 @@ public final class CarbonLoaderUtil {
               }
             }
           }
+          if (!found) {
+            LOGGER.error("Entry not found to update " + newMetaEntry + " From list :: "
+                + listOfLoadFolderDetails);
+          }
           listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
         }
         // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE
@@ -262,6 +286,17 @@ public final class CarbonLoaderUtil {
           addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry);
         }
 
+        for (LoadMetadataDetails detail: listOfLoadFolderDetails) {
+          // if the segments is in the list of marked for delete then update the status.
+          if (segmentsToBeDeleted.contains(new Segment(detail.getLoadName(), null))) {
+            detail.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+          } else if (segmentFilesTobeUpdated.contains(Segment.toSegment(detail.getLoadName()))) {
+            detail.setSegmentFile(
+                detail.getLoadName() + "_" + newMetaEntry.getUpdateStatusFileName()
+                    + CarbonTablePath.SEGMENT_EXT);
+          }
+        }
+
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
             .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
         // Delete all old stale segment folders
@@ -907,8 +942,8 @@ public final class CarbonLoaderUtil {
       String segmentId, CarbonTable carbonTable) throws IOException {
     CarbonTablePath carbonTablePath =
         CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier()));
-    Map<String, Long> dataIndexSize =
-        CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId);
+    Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTablePath,
+        new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));
     Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index 02ab1d8..52b9f52 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -18,16 +18,19 @@
 package org.apache.carbondata.processing.util;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.locks.CarbonLockFactory;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.locks.LockUsage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -60,54 +63,61 @@ public final class DeleteLoadFolders {
   }
 
   public static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete) {
+      AbsoluteTableIdentifier absoluteTableIdentifier, String metadataPath, boolean isForceDelete,
+      List<PartitionSpec> specs) {
     LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
     for (LoadMetadataDetails oneLoad : currentDetails) {
       if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
-        String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
-        boolean status = false;
         try {
-          if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
-            CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
-            CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
-
-              @Override public boolean accept(CarbonFile file) {
-                return (CarbonTablePath.isCarbonDataFile(file.getName())
-                    || CarbonTablePath.isCarbonIndexFile(file.getName())
-                    || CarbonTablePath.isPartitionMapFile(file.getName()));
-              }
-            });
+          if (oneLoad.getSegmentFile() != null) {
+            SegmentFileStore
+                .deleteSegment(absoluteTableIdentifier.getTablePath(), oneLoad.getSegmentFile(),
+                    specs);
+          } else {
+            String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad);
+            boolean status = false;
+            if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
+              CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));
+              CarbonFile[] filesToBeDeleted = file.listFiles(new CarbonFileFilter() {
+
+                @Override public boolean accept(CarbonFile file) {
+                  return (CarbonTablePath.isCarbonDataFile(file.getName()) ||
+                      CarbonTablePath.isCarbonIndexFile(file.getName()));
+                }
+              });
 
-            //if there are no fact and msr metadata files present then no need to keep
-            //entry in metadata.
-            if (filesToBeDeleted.length == 0) {
-              status = true;
-            } else {
+              //if there are no fact and msr metadata files present then no need to keep
+              //entry in metadata.
+              if (filesToBeDeleted.length == 0) {
+                status = true;
+              } else {
 
-              for (CarbonFile eachFile : filesToBeDeleted) {
-                if (!eachFile.delete()) {
-                  LOGGER.warn("Unable to delete the file as per delete command " + eachFile
-                      .getAbsolutePath());
-                  status = false;
-                } else {
-                  status = true;
+                for (CarbonFile eachFile : filesToBeDeleted) {
+                  if (!eachFile.delete()) {
+                    LOGGER.warn("Unable to delete the file as per delete command " + eachFile
+                        .getAbsolutePath());
+                    status = false;
+                  } else {
+                    status = true;
+                  }
                 }
               }
-            }
-            // need to delete the complete folder.
-            if (status) {
-              if (!file.delete()) {
-                LOGGER.warn(
-                    "Unable to delete the folder as per delete command " + file.getAbsolutePath());
+              // need to delete the complete folder.
+              if (status) {
+                if (!file.delete()) {
+                  LOGGER.warn("Unable to delete the folder as per delete command " + file
+                      .getAbsolutePath());
+                }
               }
+
+            } else {
+              LOGGER.warn("Files are not found in segment " + path
+                  + " it seems, files are already being deleted");
             }
 
-          } else {
-            LOGGER.warn("Files are not found in segment " + path
-                + " it seems, files are already being deleted");
           }
         } catch (IOException e) {
-          LOGGER.warn("Unable to delete the file as per delete command " + path);
+          LOGGER.warn("Unable to delete the file as per delete command " + oneLoad.getLoadName());
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1997ca23/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 63320ef..cd1e28a 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -83,7 +83,7 @@ public class BlockIndexStoreTest extends TestCase {
 //      assertTrue(false);
 //    }
 //    List<String> segmentIds = new ArrayList<>();
-//      segmentIds.add(info.getSegmentId());
+//      segmentIds.add(info.getSegment());
 //    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
 //  }
 //
@@ -151,7 +151,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    }
 //    List<String> segmentIds = new ArrayList<>();
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegmentId());
+//      segmentIds.add(tableBlockInfo.getSegment());
 //    }
 //    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
 //  }
@@ -223,7 +223,7 @@ public class BlockIndexStoreTest extends TestCase {
 //    }
 //    List<String> segmentIds = new ArrayList<>();
 //    for (TableBlockInfo tableBlockInfo : tableBlockInfos) {
-//      segmentIds.add(tableBlockInfo.getSegmentId());
+//      segmentIds.add(tableBlockInfo.getSegment());
 //    }
 //    cache.removeTableBlocks(segmentIds, absoluteTableIdentifier);
 //  }