You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/07/25 17:37:49 UTC

[12/15] carbondata git commit: [CARBONDATA-1319] Support concurrent data load on the same table

[CARBONDATA-1319] Support concurrent data load on the same table

Modified code to support concurrent data load. Changes done

Removed meta.lock file
Modified code to add for the new load and generate segment ID inside table status lock

This closes #1187


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1f83125f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1f83125f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1f83125f

Branch: refs/heads/master
Commit: 1f83125f12b14d18ae567e34768d22e280f1d32c
Parents: c3b2666
Author: manishgupta88 <to...@gmail.com>
Authored: Wed Jul 19 21:47:23 2017 +0800
Committer: Raghunandan S <ca...@gmail.com>
Committed: Tue Jul 25 17:37:21 2017 +0800

----------------------------------------------------------------------
 .../core/datastore/row/LoadStatusType.java      | 41 +++++++++++
 .../statusmanager/SegmentStatusManager.java     | 34 +++++++++
 .../carbondata/spark/load/CarbonLoaderUtil.java | 69 +++++++++++-------
 .../load/DataLoadProcessBuilderOnSpark.scala    |  4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  | 38 +++++-----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        | 10 ---
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  1 -
 .../carbondata/spark/util/CommonUtil.scala      | 29 +++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 72 ++++---------------
 .../execution/command/carbonTableSchema.scala   |  5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 76 +++++---------------
 .../execution/command/carbonTableSchema.scala   | 27 +------
 .../newflow/DataLoadProcessBuilder.java         |  3 +-
 .../util/CarbonDataProcessorUtil.java           |  9 +--
 14 files changed, 212 insertions(+), 206 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java b/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
new file mode 100644
index 0000000..cbdeadb
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/row/LoadStatusType.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.datastore.row;
+
+/**
+ * Load status type
+ */
+public enum LoadStatusType {
+
+  INSERT_OVERWRITE("Overwrite In Progress"), // if insert overwrite operation is in progress
+  IN_PROGRESS("In Progress"); // if load, insert into operation is in progress
+
+  private String message;
+
+  LoadStatusType(String message) {
+    this.message = message;
+  }
+
+  public String getMessage() {
+    return message;
+  }
+
+  public void setMessage(String message) {
+    this.message = message;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 63318fe..beef0c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -208,6 +208,40 @@ public class SegmentStatusManager {
   }
 
   /**
+   * This method will create new segment id
+   *
+   * @param loadMetadataDetails
+   * @return
+   */
+  public static int createNewSegmentId(LoadMetadataDetails[] loadMetadataDetails) {
+    int newSegmentId = -1;
+    for (int i = 0; i < loadMetadataDetails.length; i++) {
+      try {
+        int loadCount = Integer.parseInt(loadMetadataDetails[i].getLoadName());
+        if (newSegmentId < loadCount) {
+          newSegmentId = loadCount;
+        }
+      } catch (NumberFormatException ne) {
+        // this case is for compacted folders. For compacted folders Id will be like 0.1, 2.1
+        // consider a case when 12 loads are completed and after that major compaction is triggered.
+        // In this case new compacted folder will be created with name 12.1 and after query time
+        // out all the compacted folders will be deleted and entry will also be removed from the
+        // table status file. In that case also if a new load comes the new segment Id assigned
+        // should be 13 and not 0
+        String loadName = loadMetadataDetails[i].getLoadName();
+        if (loadName.contains(".")) {
+          int loadCount = Integer.parseInt(loadName.split("\\.")[0]);
+          if (newSegmentId < loadCount) {
+            newSegmentId = loadCount;
+          }
+        }
+      }
+    }
+    newSegmentId++;
+    return newSegmentId;
+  }
+
+  /**
    * compares two given date strings
    *
    * @param loadValue

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 5cef14a..e2b0d11 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -211,7 +211,8 @@ public final class CarbonLoaderUtil {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     String tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), isCompactionFlow);
+        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+            loadModel.getTaskNo(), isCompactionFlow);
     // form local store location
     final String localStoreLocation = CarbonProperties.getInstance().getProperty(tempLocationKey);
     if (localStoreLocation == null) {
@@ -247,29 +248,21 @@ public final class CarbonLoaderUtil {
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.
    *
-   * @param loadCount
-   * @param loadMetadataDetails
+   * @param newMetaEntry
    * @param loadModel
-   * @param loadStatus
-   * @param startLoadTime
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
-      CarbonLoadModel loadModel, String loadStatus, long startLoadTime) throws IOException {
-
+  public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
+      CarbonLoadModel loadModel, boolean loadStartEntry) throws IOException {
     boolean status = false;
-
     String metaDataFilepath =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
     CarbonTablePath carbonTablePath = CarbonStorePath
         .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
-
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
     ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
@@ -280,23 +273,33 @@ public final class CarbonLoaderUtil {
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
-        loadMetadataDetails.setLoadEndTime(loadEnddate);
-        loadMetadataDetails.setLoadStatus(loadStatus);
-        loadMetadataDetails.setLoadName(String.valueOf(loadCount));
-        loadMetadataDetails.setLoadStartTime(startLoadTime);
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-        if (null != listOfLoadFolderDetailsArray) {
-          Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
+        Collections.addAll(listOfLoadFolderDetails, listOfLoadFolderDetailsArray);
+        // create a new segment Id if load has just begun else add the already generated Id
+        if (loadStartEntry) {
+          String segmentId =
+              String.valueOf(SegmentStatusManager.createNewSegmentId(listOfLoadFolderDetailsArray));
+          newMetaEntry.setLoadName(segmentId);
+          loadModel.setLoadMetadataDetails(listOfLoadFolderDetails);
+          loadModel.setSegmentId(segmentId);
+          listOfLoadFolderDetails.add(newMetaEntry);
+        } else {
+          newMetaEntry.setLoadName(String.valueOf(loadModel.getSegmentId()));
+          // existing entry needs to be overwritten as the entry will exist with some
+          // intermediate status
+          int indexToOverwriteNewMetaEntry = 0;
+          for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
+            if (entry.getLoadName().equals(newMetaEntry.getLoadName())
+                && entry.getLoadStartTime() == newMetaEntry.getLoadStartTime()) {
+              break;
+            }
+            indexToOverwriteNewMetaEntry++;
+          }
+          listOfLoadFolderDetails.set(indexToOverwriteNewMetaEntry, newMetaEntry);
         }
-        listOfLoadFolderDetails.add(loadMetadataDetails);
-
         SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails
             .toArray(new LoadMetadataDetails[listOfLoadFolderDetails.size()]));
-
         status = true;
       } else {
         LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel
@@ -316,6 +319,24 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
+  /**
+   * Method to create new entry for load in table status file
+   *
+   * @param loadMetadataDetails
+   * @param loadStatus
+   * @param loadStartTime
+   * @param addLoadEndTime
+   */
+  public static void populateNewLoadMetaEntry(LoadMetadataDetails loadMetadataDetails,
+      String loadStatus, long loadStartTime, boolean addLoadEndTime) {
+    if (addLoadEndTime) {
+      long loadEndDate = CarbonUpdateUtil.readCurrentTime();
+      loadMetadataDetails.setLoadEndTime(loadEndDate);
+    }
+    loadMetadataDetails.setLoadStatus(loadStatus);
+    loadMetadataDetails.setLoadStartTime(loadStartTime);
+  }
+
   public static void writeLoadMetadata(String storeLocation, String dbName, String tableName,
       List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
     CarbonTablePath carbonTablePath =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 04e0d22..534ab88 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -48,8 +48,7 @@ object DataLoadProcessBuilderOnSpark {
   def loadDataUsingGlobalSort(
       sc: SparkContext,
       dataFrame: Option[DataFrame],
-      model: CarbonLoadModel,
-      currentLoadCount: Int): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
+      model: CarbonLoadModel): Array[(String, (LoadMetadataDetails, ExecutionErrors))] = {
     val originRDD = if (dataFrame.isDefined) {
       dataFrame.get.rdd
     } else {
@@ -68,7 +67,6 @@ object DataLoadProcessBuilderOnSpark {
     }
 
     model.setPartitionId("0")
-    model.setSegmentId(currentLoadCount.toString)
     val modelBroadcast = sc.broadcast(model)
     val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index bc5ca06..5c0e77d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -84,11 +84,6 @@ class CarbonMergerRDD[K, V](
       } else {
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
       }
-      val tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
-          carbonLoadModel.getTableName,
-          carbonLoadModel.getTaskNo,
-          true)
       // this property is used to determine whether temp location for carbon is inside
       // container temp dir or is yarn application directory.
       val carbonUseLocalDir = CarbonProperties.getInstance()
@@ -107,8 +102,6 @@ class CarbonMergerRDD[K, V](
         storeLocation = System.getProperty("java.io.tmpdir")
       }
       storeLocation = storeLocation + '/' + System.nanoTime() + '/' + theSplit.index
-      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
-      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
       var mergeStatus = false
       var mergeNumber = ""
       var exec: CarbonCompactionExecutor = null
@@ -144,7 +137,25 @@ class CarbonMergerRDD[K, V](
           carbonMergerMapping.maxSegmentColumnSchemaList = dataFileFooter.getColumnInTable.asScala
             .toList
         }
-
+        mergeNumber = if (carbonMergerMapping.campactionType ==
+                              CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+          tableBlockInfoList.get(0).getSegmentId
+        }
+        else {
+          mergedLoadName
+            .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
+                       CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
+            )
+        }
+        carbonLoadModel.setSegmentId(mergeNumber)
+        val tempLocationKey = CarbonDataProcessorUtil
+          .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+            carbonLoadModel.getTableName,
+            carbonLoadModel.getSegmentId,
+            carbonLoadModel.getTaskNo,
+            true)
+        CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+        LOGGER.info(s"Temp storeLocation taken is $storeLocation")
         // get destination segment properties as sent from driver which is of last segment.
         val segmentProperties = new SegmentProperties(
           carbonMergerMapping.maxSegmentColumnSchemaList.asJava,
@@ -180,16 +191,6 @@ class CarbonMergerRDD[K, V](
             }
         }
 
-        if(carbonMergerMapping.campactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
-          mergeNumber = tableBlockInfoList.get(0).getSegmentId
-        }
-        else {
-          mergeNumber = mergedLoadName
-            .substring(mergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) +
-                       CarbonCommonConstants.LOAD_FOLDER.length(), mergedLoadName.length()
-            )
-        }
-
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
           factTableName,
           carbonLoadModel.getTaskNo,
@@ -198,7 +199,6 @@ class CarbonMergerRDD[K, V](
           true
         )
 
-        carbonLoadModel.setSegmentId(mergeNumber)
         carbonLoadModel.setPartitionId("0")
         var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 4aad0ed..e558218 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -120,7 +120,6 @@ class CarbonTableSplitPartition(rddId: Int, val idx: Int, @transient val tableSp
 class SparkPartitionLoader(model: CarbonLoadModel,
     splitIndex: Int,
     storePath: String,
-    loadCount: String,
     loadMetadataDetails: LoadMetadataDetails) {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
@@ -169,7 +168,6 @@ class NewCarbonDataLoadRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    loadCount: Integer,
     blocksGroupBy: Array[(String, Array[BlockDetails])],
     isTableSplitPartition: Boolean)
   extends CarbonRDD[(K, V)](sc, Nil) {
@@ -228,7 +226,6 @@ class NewCarbonDataLoadRDD[K, V](
         loadMetadataDetails.setPartitionCount(partitionID)
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
 
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
           .USE_PREFETCH_WHILE_LOADING, CarbonCommonConstants.USE_PREFETCH_WHILE_LOADING_DEFAULT)
         carbonLoadModel.setPreFetch(preFetch.toBoolean)
@@ -236,7 +233,6 @@ class NewCarbonDataLoadRDD[K, V](
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
           null,
-          String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
@@ -390,7 +386,6 @@ class NewDataFrameLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    loadCount: Integer,
     prev: DataLoadCoalescedRDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -408,7 +403,6 @@ class NewDataFrameLoaderRDD[K, V](
         loadMetadataDetails.setPartitionCount(partitionID)
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         carbonLoadModel.setPreFetch(false)
 
@@ -427,7 +421,6 @@ class NewDataFrameLoaderRDD[K, V](
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
           null,
-          String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()
@@ -586,7 +579,6 @@ class PartitionTableDataLoaderRDD[K, V](
     sc: SparkContext,
     result: DataLoadResult[K, V],
     carbonLoadModel: CarbonLoadModel,
-    loadCount: Integer,
     prev: RDD[Row]) extends CarbonRDD[(K, V)](prev) {
 
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
@@ -605,7 +597,6 @@ class PartitionTableDataLoaderRDD[K, V](
         loadMetadataDetails.setPartitionCount(partitionID)
         loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
         carbonLoadModel.setPartitionId(partitionID)
-        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
         val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
@@ -615,7 +606,6 @@ class PartitionTableDataLoaderRDD[K, V](
         val loader = new SparkPartitionLoader(model,
           theSplit.index,
           null,
-          String.valueOf(loadCount),
           loadMetadataDetails)
         // Intialize to set carbon properties
         loader.initialize()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index bcfc096..4cf2135 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -49,7 +49,6 @@ object UpdateDataLoad {
       val loader = new SparkPartitionLoader(carbonLoadModel,
         index,
         null,
-        segId,
         loadMetadataDetails)
       // Intialize to set carbon properties
       loader.initialize()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 9c74a31..4205235 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -36,12 +36,14 @@ import org.apache.spark.util.FileUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.row.LoadStatusType
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
 import org.apache.carbondata.core.metadata.datatype.DataType
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.comparator.Comparator
 import org.apache.carbondata.processing.csvload.CSVInputFormat
@@ -49,6 +51,7 @@ import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.load.CarbonLoaderUtil
 
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -444,6 +447,30 @@ object CommonUtil {
     parsedPropertyValueString
   }
 
+  def readAndUpdateLoadProgressInTableMeta(model: CarbonLoadModel,
+      storePath: String,
+      insertOverwrite: Boolean = false): Unit = {
+    val newLoadMetaEntry = new LoadMetadataDetails
+    val status: String = if (insertOverwrite) {
+      LoadStatusType.INSERT_OVERWRITE.getMessage
+    } else {
+      LoadStatusType.IN_PROGRESS.getMessage
+    }
+    // reading the start time of data load.
+    val loadStartTime = CarbonUpdateUtil.readCurrentTime
+    model.setFactTimeStamp(loadStartTime)
+    CarbonLoaderUtil
+      .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp, false)
+    val entryAdded: Boolean = CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true)
+    if (!entryAdded) {
+      sys
+        .error(s"Failed to add entry in table status for ${ model.getDatabaseName }.${
+          model
+            .getTableName
+        }")
+    }
+  }
+
   def readLoadMetadataDetails(model: CarbonLoadModel, storePath: String): Unit = {
     val metadataPath = model.getCarbonDataLoadSchema.getCarbonTable.getMetaDataFilepath
     val details = SegmentStatusManager.readLoadMetadata(metadataPath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 31b05bc..84a3413 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -231,16 +231,6 @@ object CarbonDataRDDFactory {
     val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
-
-    // clean up of the stale segments.
-    try {
-      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-            s" ${ e.getMessage }")
-    }
-
     val compactionThread = new Thread {
       override def run(): Unit = {
 
@@ -435,42 +425,6 @@ object CarbonDataRDDFactory {
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
         carbonLoadModel.getTableName, storePath, false, carbonTable)
-      var currentLoadCount = -1
-      val convLoadDetails = carbonLoadModel.getLoadMetadataDetails.asScala
-      // taking the latest segment ID present.
-      // so that any other segments above this will be deleted.
-      if (convLoadDetails.nonEmpty) {
-        convLoadDetails.foreach { l =>
-          var loadCount = 0
-          breakable {
-            try {
-              loadCount = Integer.parseInt(l.getLoadName)
-            } catch {
-              case e: NumberFormatException => // case of merge folder. ignore it.
-                break
-            }
-            if (currentLoadCount < loadCount) {
-              currentLoadCount = loadCount
-            }
-          }
-        }
-      }
-      currentLoadCount += 1
-      // Deleting the any partially loaded data if present.
-      // in some case the segment folder which is present in store will not have entry in status.
-      // so deleting those folders.
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER
-              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
-      }
-
-      // reading the start time of data load.
-      val loadStartTime = CarbonUpdateUtil.readCurrentTime();
-      carbonLoadModel.setFactTimeStamp(loadStartTime)
-
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -599,7 +553,6 @@ object CarbonDataRDDFactory {
         status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
           new DataLoadResultImpl(),
           carbonLoadModel,
-          currentLoadCount,
           blocksGroupBy,
           isTableSplitPartition).collect()
       }
@@ -620,7 +573,6 @@ object CarbonDataRDDFactory {
           status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
             new DataLoadResultImpl(),
             carbonLoadModel,
-            currentLoadCount,
             newRdd).collect()
         } catch {
           case ex: Exception =>
@@ -732,7 +684,6 @@ object CarbonDataRDDFactory {
           status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
             new DataLoadResultImpl(),
             carbonLoadModel,
-            currentLoadCount,
             rdd).collect()
         } catch {
           case ex: Exception =>
@@ -741,9 +692,10 @@ object CarbonDataRDDFactory {
         }
       }
 
+      // create new segment folder  in carbon store
       if (!updateModel.isDefined) {
-      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-        currentLoadCount.toString, carbonTable)
+        CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
+          carbonLoadModel.getSegmentId, carbonTable)
       }
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
@@ -759,7 +711,7 @@ object CarbonDataRDDFactory {
             sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
           LOGGER.audit("Using global sort for loading.")
           status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
-            dataFrame, carbonLoadModel, currentLoadCount)
+            dataFrame, carbonLoadModel)
         } else if (dataFrame.isDefined) {
           loadDataFrame()
         } else {
@@ -908,9 +860,9 @@ object CarbonDataRDDFactory {
 
         return
       }
-      LOGGER.info("********starting clean up**********")
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+        LOGGER.info("********starting clean up**********")
+        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -922,7 +874,8 @@ object CarbonDataRDDFactory {
         if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
             status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
             carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+          LOGGER.info("********starting clean up**********")
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -931,8 +884,13 @@ object CarbonDataRDDFactory {
         val metadataDetails = status(0)._2._1
         if (!isAgg) {
             writeDictionary(carbonLoadModel, result, false)
-            val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
-              carbonLoadModel, loadStatus, loadStartTime)
+            CarbonLoaderUtil
+              .populateNewLoadMetaEntry(metadataDetails,
+                loadStatus,
+                carbonLoadModel.getFactTimeStamp,
+                true)
+            val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+              carbonLoadModel, false)
             if (!status) {
               val errorMessage = "Dataload failed due to failure in table status updation."
               LOGGER.audit("Data load is failed for " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index da6dd98..97a0593 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -561,9 +561,8 @@ case class LoadTable(
         val dictFolderPath = carbonTablePath.getMetadataDirectoryPath
         val dimensions = carbonTable.getDimensionByTableName(
           carbonTable.getFactTableName).asScala.toArray
-        if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
-        }
+        // add the start entry for the new load in the table status file
+        CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath)
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(columnDict) && StringUtils.isEmpty(allDictionaryPath)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index fc813d1..d3557a5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -243,15 +243,6 @@ object CarbonDataRDDFactory {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
     }
 
-    // clean up of the stale segments.
-    try {
-      CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, true)
-    } catch {
-      case e: Exception =>
-        LOGGER.error(s"Exception in compaction thread while clean up of stale segments" +
-            s" ${ e.getMessage }")
-    }
-
     val compactionThread = new Thread {
       override def run(): Unit = {
 
@@ -448,41 +439,6 @@ object CarbonDataRDDFactory {
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
         carbonLoadModel.getTableName, storePath, false, carbonTable)
-      var currentLoadCount = -1
-      val convLoadDetails = carbonLoadModel.getLoadMetadataDetails.asScala
-      // taking the latest segment ID present.
-      // so that any other segments above this will be deleted.
-      if (convLoadDetails.nonEmpty) {
-        convLoadDetails.foreach { l =>
-          var loadCount = 0
-          breakable {
-            try {
-              loadCount = Integer.parseInt(l.getLoadName)
-            } catch {
-              case e: NumberFormatException => // case of merge folder. ignore it.
-                break
-            }
-            if (currentLoadCount < loadCount) {
-              currentLoadCount = loadCount
-            }
-          }
-        }
-      }
-      currentLoadCount += 1
-      // Deleting the any partially loaded data if present.
-      // in some case the segment folder which is present in store will not have entry in status.
-      // so deleting those folders.
-      try {
-        CarbonLoaderUtil.deletePartialLoadDataIfExist(carbonLoadModel, false)
-      } catch {
-        case e: Exception =>
-          LOGGER
-              .error(s"Exception in data load while clean up of stale segments ${ e.getMessage }")
-      }
-
-      // reading the start time of data load.
-      val loadStartTime = CarbonUpdateUtil.readCurrentTime();
-      carbonLoadModel.setFactTimeStamp(loadStartTime)
       // get partition way from configuration
       // val isTableSplitPartition = CarbonProperties.getInstance().getProperty(
       // CarbonCommonConstants.TABLE_SPLIT_PARTITION,
@@ -611,7 +567,6 @@ object CarbonDataRDDFactory {
         status = new NewCarbonDataLoadRDD(sqlContext.sparkContext,
           new DataLoadResultImpl(),
           carbonLoadModel,
-          currentLoadCount,
           blocksGroupBy,
           isTableSplitPartition).collect()
       }
@@ -630,7 +585,6 @@ object CarbonDataRDDFactory {
           status = new NewDataFrameLoaderRDD(sqlContext.sparkContext,
             new DataLoadResultImpl(),
             carbonLoadModel,
-            currentLoadCount,
             newRdd).collect()
 
         } catch {
@@ -743,7 +697,6 @@ object CarbonDataRDDFactory {
           status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
             new DataLoadResultImpl(),
             carbonLoadModel,
-            currentLoadCount,
             rdd).collect()
         } catch {
           case ex: Exception =>
@@ -751,10 +704,10 @@ object CarbonDataRDDFactory {
             throw ex
         }
       }
-
+      // create new segment folder  in carbon store
       if (!updateModel.isDefined) {
-      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-        currentLoadCount.toString, carbonTable)
+        CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
+          carbonLoadModel.getSegmentId, carbonTable)
       }
       var loadStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
       var errorMessage: String = "DataLoad failure"
@@ -770,7 +723,7 @@ object CarbonDataRDDFactory {
             sortScope.equals(SortScopeOptions.SortScope.GLOBAL_SORT)) {
           LOGGER.audit("Using global sort for loading.")
           status = DataLoadProcessBuilderOnSpark.loadDataUsingGlobalSort(sqlContext.sparkContext,
-            dataFrame, carbonLoadModel, currentLoadCount)
+            dataFrame, carbonLoadModel)
         } else if (dataFrame.isDefined) {
           loadDataFrame()
         } else {
@@ -921,9 +874,9 @@ object CarbonDataRDDFactory {
 
         return
       }
-      LOGGER.info("********starting clean up**********")
       if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) {
-        CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+        LOGGER.info("********starting clean up**********")
+        CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
         LOGGER.info("********clean up done**********")
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
@@ -935,15 +888,17 @@ object CarbonDataRDDFactory {
         if (loadStatus == CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS &&
             status(0)._2._2.failureCauses == FailureCauses.BAD_RECORDS &&
             carbonLoadModel.getBadRecordsAction.split(",")(1) == LoggerAction.FAIL.name) {
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+          LOGGER.info("********starting clean up**********")
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
           throw new Exception(status(0)._2._2.errorMsg)
         }
         // if segment is empty then fail the data load
-        if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, currentLoadCount)) {
-          CarbonLoaderUtil.deleteSegment(carbonLoadModel, currentLoadCount)
+        if (!CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+          LOGGER.info("********starting clean up**********")
+          CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)
           LOGGER.info("********clean up done**********")
           LOGGER.audit(s"Data load is failed for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }" +
@@ -954,8 +909,13 @@ object CarbonDataRDDFactory {
         val metadataDetails = status(0)._2._1
         if (!isAgg) {
           writeDictionary(carbonLoadModel, result, false)
-          val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
-            carbonLoadModel, loadStatus, loadStartTime)
+          CarbonLoaderUtil
+            .populateNewLoadMetaEntry(metadataDetails,
+              loadStatus,
+              carbonLoadModel.getFactTimeStamp,
+              true)
+          val status = CarbonLoaderUtil.recordLoadMetadata(metadataDetails,
+            carbonLoadModel, false)
           if (!status) {
             val errorMessage = "Dataload failed due to failure in table status updation."
             LOGGER.audit("Data load is failed for " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 15bf460..e7baeda 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -503,21 +503,7 @@ case class LoadTable(
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
     val optionsFinal = getFinalOptions(carbonProperty)
-    val carbonLock = CarbonLockFactory
-        .getCarbonLockObj(relation.tableMeta.carbonTable.getAbsoluteTableIdentifier
-            .getCarbonTableIdentifier,
-          LockUsage.METADATA_LOCK
-        )
     try {
-      // take lock only in case of normal data load.
-      if (updateModel.isEmpty) {
-        if (carbonLock.lockWithRetries()) {
-          LOGGER.info("Successfully able to get the table metadata file lock")
-        } else {
-          sys.error("Table is locked for updation. Please try after some time")
-        }
-      }
-
       val factPath = if (dataFrame.isDefined) {
         ""
       } else {
@@ -655,9 +641,8 @@ case class LoadTable(
         carbonLoadModel.setMaxColumns(validatedMaxColumns.toString)
         GlobalDictionaryUtil.updateTableMetadataFunc = LoadTable.updateTableMetadata
         val storePath = relation.tableMeta.storePath
-        if (null == carbonLoadModel.getLoadMetadataDetails) {
-          CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
-        }
+        // add the start entry for the new load in the table status file
+        CommonUtil.readAndUpdateLoadProgressInTableMeta(carbonLoadModel, storePath)
         if (carbonLoadModel.getLoadMetadataDetails.isEmpty && carbonLoadModel.getUseOnePass &&
             StringUtils.isEmpty(column_dict) && StringUtils.isEmpty(all_dictionary_path)) {
           LOGGER.info(s"Cannot use single_pass=true for $dbName.$tableName during the first load")
@@ -805,14 +790,6 @@ case class LoadTable(
       case mce: MalformedCarbonCommandException =>
         LOGGER.audit(s"Dataload failed for $dbName.$tableName. " + mce.getMessage)
         throw mce
-    } finally {
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          LOGGER.info("Table MetaData Unlocked Successfully after data load")
-        } else {
-          LOGGER.error("Unable to unlock Table MetaData")
-        }
-      }
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 5662a04..9a0c193 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -142,7 +142,8 @@ public final class DataLoadProcessBuilder {
     String databaseName = loadModel.getDatabaseName();
     String tableName = loadModel.getTableName();
     String tempLocationKey = CarbonDataProcessorUtil
-        .getTempStoreLocationKey(databaseName, tableName, loadModel.getTaskNo(), false);
+        .getTempStoreLocationKey(databaseName, tableName, loadModel.getSegmentId(),
+            loadModel.getTaskNo(), false);
     CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, loadModel.getStorePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1f83125f/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 17f4df6..6715562 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -172,7 +172,7 @@ public final class CarbonDataProcessorUtil {
   public static String getLocalDataFolderLocation(String databaseName, String tableName,
       String taskId, String partitionId, String segmentId, boolean isCompactionFlow) {
     String tempLocationKey =
-        getTempStoreLocationKey(databaseName, tableName, taskId, isCompactionFlow);
+        getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow);
     String baseStorePath = CarbonProperties.getInstance().getProperty(tempLocationKey);
     if (baseStorePath == null) {
       LOGGER.warn("Location not set for the key " + tempLocationKey);
@@ -191,14 +191,15 @@ public final class CarbonDataProcessorUtil {
    *
    * @param databaseName
    * @param tableName
+   * @param segmentId
    * @param taskId
    * @param isCompactionFlow
    * @return
    */
-  public static String getTempStoreLocationKey(String databaseName, String tableName, String taskId,
-      boolean isCompactionFlow) {
+  public static String getTempStoreLocationKey(String databaseName, String tableName,
+      String segmentId, String taskId, boolean isCompactionFlow) {
     String tempLocationKey = databaseName + CarbonCommonConstants.UNDERSCORE + tableName
-        + CarbonCommonConstants.UNDERSCORE + taskId;
+        + CarbonCommonConstants.UNDERSCORE + segmentId + CarbonCommonConstants.UNDERSCORE + taskId;
     if (isCompactionFlow) {
       tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + CarbonCommonConstants.UNDERSCORE
           + tempLocationKey;