You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/03/02 08:01:27 UTC

[06/50] [abbrv] carbondata git commit: [CARBONDATA-1992] Remove partitionId in CarbonTablePath

[CARBONDATA-1992] Remove partitionId in CarbonTablePath

In CarbonTablePath, there is a deprecated partition id which is always 0, it should be removed to avoid confusion.

This closes #1765


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2e28c156
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2e28c156
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2e28c156

Branch: refs/heads/carbonstore-rebase5
Commit: 2e28c15681066bad85dab0104307f927fb777a63
Parents: 7f7ea4d
Author: Jacky Li <ja...@qq.com>
Authored: Sat Jan 6 20:28:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Fri Mar 2 15:51:53 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    |   0
 .../core/mutate/CarbonUpdateUtil.java           |   8 +-
 .../core/statusmanager/LoadMetadataDetails.java |   1 +
 .../SegmentUpdateStatusManager.java             |   6 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   6 +-
 .../core/util/path/CarbonTablePath.java         |  55 ++++---
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   2 +-
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 .../hadoop/test/util/StoreCreator.java          |   1 -
 .../presto/util/CarbonDataStoreCreator.scala    |   1 -
 .../dataload/TestLoadDataGeneral.scala          |   2 +-
 .../InsertIntoCarbonTableTestCase.scala         |   4 +-
 .../dataload/TestBatchSortDataLoad.scala        |   3 +-
 .../dataload/TestDataLoadWithFileName.scala     |   2 +-
 .../dataload/TestGlobalSortDataLoad.scala       |   4 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  34 ++--
 .../TestDataLoadingForPartitionTable.scala      |   3 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   1 -
 .../load/DataLoadProcessorStepOnSpark.scala     |   2 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 154 +++++++++++--------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  11 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  25 ++-
 .../org/apache/spark/util/PartitionUtils.scala  |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   5 +-
 .../datasources/CarbonFileFormat.scala          |   1 -
 .../partition/TestAlterPartitionTable.scala     |   2 +-
 .../bucketing/TableBucketingTestCase.scala      |   2 +
 .../loading/CarbonDataLoadConfiguration.java    |  10 --
 .../loading/DataLoadProcessBuilder.java         |   1 -
 .../loading/TableProcessingOperations.java      |   3 +-
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   4 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  15 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   7 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  21 ++-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  33 ++--
 .../steps/DataWriterBatchProcessorStepImpl.java |  25 +--
 .../steps/DataWriterProcessorStepImpl.java      |  22 +--
 .../processing/merger/CarbonDataMergerUtil.java |   6 +-
 .../merger/CompactionResultSortProcessor.java   |   4 +-
 .../sort/sortdata/SortParameters.java           |  16 +-
 .../store/CarbonFactDataHandlerModel.java       |   3 +-
 .../util/CarbonDataProcessorUtil.java           |   9 +-
 .../processing/util/CarbonLoaderUtil.java       |  12 +-
 .../processing/util/DeleteLoadFolders.java      |   7 +-
 .../carbondata/processing/StoreCreator.java     |   1 -
 .../carbondata/streaming/StreamHandoffRDD.scala |   1 -
 .../streaming/StreamSinkFactory.scala           |   2 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   8 +-
 49 files changed, 274 insertions(+), 282 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index de98fa8..18eae11 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -319,9 +319,7 @@ public class CarbonUpdateUtil {
     CarbonTablePath carbonTablePath = CarbonStorePath
             .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
                     absoluteTableIdentifier.getCarbonTableIdentifier());
-    // as of now considering only partition 0.
-    String partitionId = "0";
-    String partitionDir = carbonTablePath.getPartitionDir(partitionId);
+    String partitionDir = carbonTablePath.getPartitionDir();
     CarbonFile file =
             FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
     if (!file.exists()) {
@@ -402,7 +400,7 @@ public class CarbonUpdateUtil {
   }
 
   public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
-    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath(segmentId);
 
     // scan all the carbondata files and get the latest task ID.
     CarbonFile segment =
@@ -467,7 +465,7 @@ public class CarbonUpdateUtil {
               || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
 
         // take the list of files from this segment.
-        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName());
         CarbonFile segDir =
                 FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
         CarbonFile[] allSegmentFiles = segDir.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index a0fa67d..b6a9e36 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -132,6 +132,7 @@ public class LoadMetadataDetails implements Serializable {
     return partitionCount;
   }
 
+  @Deprecated
   public void setPartitionCount(String partitionCount) {
     this.partitionCount = partitionCount;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 71b6ba8..2edb379 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -202,7 +202,7 @@ public class SegmentUpdateStatusManager {
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     String endTimeStamp = "";
     String startTimeStamp = "";
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     for (LoadMetadataDetails eachSeg : segmentDetails) {
@@ -437,7 +437,7 @@ public class SegmentUpdateStatusManager {
         .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId.getSegmentNo());
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId.getSegmentNo());
 
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
@@ -880,7 +880,7 @@ public class SegmentUpdateStatusManager {
 
     // filter out the fact files.
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index eb0a9d7..52305bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1105,7 +1105,7 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+        .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
             bucketNumber, CarbonTablePath.DataFileUtil
                 .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
             tableBlockInfoList.get(0).getVersion());
@@ -1348,7 +1348,7 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+        .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
             bucketNumber, CarbonTablePath.DataFileUtil
                 .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
             tableBlockInfoList.get(0).getVersion());
@@ -2305,7 +2305,7 @@ public final class CarbonUtil {
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
     switch (fileType) {
       case HDFS:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index fa742d8..ebc14bb 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -272,15 +272,14 @@ public class CarbonTablePath extends Path {
   /**
    * Gets absolute path of data file
    *
-   * @param partitionId         unique partition identifier
    * @param segmentId           unique partition identifier
    * @param filePartNo          data file part number
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return absolute path of data file stored in carbon data format
    */
-  public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
-    return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
+  public String getCarbonDataFilePath(String segmentId, Integer filePartNo, Long taskNo,
+      int batchNo, int bucketNumber, String factUpdateTimeStamp) {
+    return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName(
         filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
   }
 
@@ -289,13 +288,12 @@ public class CarbonTablePath extends Path {
    * based on task id
    *
    * @param taskId      task id of the file
-   * @param partitionId partition number
    * @param segmentId   segment number
    * @return full qualified carbon index path
    */
-  public String getCarbonIndexFilePath(final String taskId, final String partitionId,
-      final String segmentId, final String bucketNumber) {
-    String segmentDir = getSegmentDir(partitionId, segmentId);
+  public String getCarbonIndexFilePath(final String taskId, final String segmentId,
+      final String bucketNumber) {
+    String segmentDir = getSegmentDir(segmentId);
     CarbonFile carbonFile =
         FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
 
@@ -311,9 +309,8 @@ public class CarbonTablePath extends Path {
     if (files.length > 0) {
       return files[0].getAbsolutePath();
     } else {
-      throw new RuntimeException("Missing Carbon index file for partition["
-          + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
-          + "]");
+      throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], "
+          + "taskId[" + taskId + "]");
     }
   }
 
@@ -321,8 +318,6 @@ public class CarbonTablePath extends Path {
    * Below method will be used to get the carbon index file path
    * @param taskId
    *        task id
-   * @param partitionId
-   *        partition id
    * @param segmentId
    *        segment id
    * @param bucketNumber
@@ -331,28 +326,27 @@ public class CarbonTablePath extends Path {
    *        timestamp
    * @return carbon index file path
    */
-  public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
-      String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
+  public String getCarbonIndexFilePath(String taskId, String segmentId, String bucketNumber,
+      String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
       default:
-        String segmentDir = getSegmentDir(partitionId, segmentId);
+        String segmentDir = getSegmentDir(segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(taskId,
             Integer.parseInt(bucketNumber), timeStamp);
     }
   }
 
-  public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
-      int batchNo, String bucketNumber, String timeStamp,
-      ColumnarFormatVersion columnarFormatVersion) {
+  public String getCarbonIndexFilePath(String taskId, String segmentId, int batchNo,
+      String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
       default:
-        String segmentDir = getSegmentDir(partitionId, segmentId);
+        String segmentDir = getSegmentDir(segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
             Integer.parseInt(bucketNumber), batchNo, timeStamp);
     }
@@ -369,12 +363,11 @@ public class CarbonTablePath extends Path {
   /**
    * Gets absolute path of data file
    *
-   * @param partitionId unique partition identifier
    * @param segmentId   unique partition identifier
    * @return absolute path of data file stored in carbon data format
    */
-  public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
-    return getSegmentDir(partitionId, segmentId);
+  public String getCarbonDataDirectoryPath(String segmentId) {
+    return getSegmentDir(segmentId);
   }
 
   /**
@@ -412,12 +405,16 @@ public class CarbonTablePath extends Path {
     return segmentDir + File.separator + getCarbonStreamIndexFileName();
   }
 
-  public String getSegmentDir(String partitionId, String segmentId) {
-    return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
+  public String getSegmentDir(String segmentId) {
+    return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId;
   }
 
-  public String getPartitionDir(String partitionId) {
-    return getFactDir() + File.separator + PARTITION_PREFIX + partitionId;
+  // This partition is not used in any code logic, just keep backward compatibility
+  public static final String DEPRECATED_PATITION_ID = "0";
+
+  public String getPartitionDir() {
+    return getFactDir() + File.separator + PARTITION_PREFIX +
+        CarbonTablePath.DEPRECATED_PATITION_ID;
   }
 
   private String getMetaDataDir() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 5549806..a1ccab3 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L,  0, 0, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata"));
+    assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L,  0, 0, "999").replace("\\", "/")
+        .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 96b0b21..069e1f7 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -499,7 +499,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (Segment segment : streamSegments) {
-        String segmentDir = tablePath.getSegmentDir("0", segment.getSegmentNo());
+        String segmentDir = tablePath.getSegmentDir(segment.getSegmentNo());
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 364a6a6..3ef8afc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -129,7 +129,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     CarbonTablePath tablePath =
         CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
-    segmentDir = tablePath.getSegmentDir("0", segmentId);
+    segmentDir = tablePath.getSegmentDir(segmentId);
     fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index fbf33d6..ac17c4e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -154,7 +154,6 @@ public class StoreCreator {
     loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
     loadModel.setTaskNo("0");
     loadModel.setSegmentId("0");
-    loadModel.setPartitionId("0");
     loadModel.setFactTimeStamp(System.currentTimeMillis());
     loadModel.setMaxColumns("10");
     return loadModel;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 7b5c311..a41e738 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -135,7 +135,6 @@ object CarbonDataStoreCreator {
       loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
       loadModel.setTaskNo("0")
       loadModel.setSegmentId("0")
-      loadModel.setPartitionId("0")
       loadModel.setFactTimeStamp(System.currentTimeMillis())
       loadModel.setMaxColumns("15")
       executeGraph(loadModel, storePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 09ca9e5..c84ae6b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -49,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
       tableName: String): Boolean = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName)
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index d59f0b5..5cc4156 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -232,7 +232,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val folder = new File(partitionPath)
     assert(folder.isDirectory)
     assert(folder.list().length == 1)
@@ -255,7 +255,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val folder = new File(partitionPath)
 
     assert(folder.isDirectory)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 4af9d54..42ac4df 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -193,9 +193,8 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       tableName
     )
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index dae0962..db0a62c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -49,7 +49,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val indexReader = new CarbonIndexFileReader()
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0")
     val carbonIndexPaths = new File(segmentDir)
       .listFiles(new FilenameFilter {
         override def accept(dir: File, name: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0d9e0fd..479db50 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -272,7 +272,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+    val segmentDir = carbonTablePath.getSegmentDir("0")
     assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
   }
 
@@ -379,7 +379,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index f403b3e..d60b7db 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -220,19 +220,27 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test if preaggregate load is successfull for hivemetastore") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
-    sql("DROP TABLE IF EXISTS maintable")
-    sql(
-      """
-        | CREATE TABLE maintable(id int, name string, city string, age int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-    sql(
-      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
-        .stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
-    checkAnswer(sql(s"select * from maintable_preagg_sum"),
-      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+      sql("DROP TABLE IF EXISTS maintable")
+      sql(
+        """
+          | CREATE TABLE maintable(id int, name string, city string, age int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(
+        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+          .stripMargin)
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+      checkAnswer(sql(s"select * from maintable_preagg_sum"),
+        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
   }
 
   test("test preaggregate load for decimal column for hivemetastore") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index ed151bd..0a21aed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -63,7 +63,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {
@@ -87,6 +87,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
         |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
         |  utilization int,salary int)
         | PARTITIONED BY (empno int)
+        |
         | STORED BY 'org.apache.carbondata.format'
         | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
       """.stripMargin)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 30e4fc9..e1bd84b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -56,7 +56,6 @@ object DataLoadProcessBuilderOnSpark {
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 
-    model.setPartitionId("0")
     val sc = sparkSession.sparkContext
     val modelBroadcast = sc.broadcast(model)
     val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 4b7d3f7..5124247 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -248,7 +248,7 @@ object DataLoadProcessorStepOnSpark {
 
       dataWriter = new DataWriterProcessorStepImpl(conf)
 
-      val dataHandlerModel = dataWriter.getDataHandlerModel(0)
+      val dataHandlerModel = dataWriter.getDataHandlerModel
       var dataHandler: CarbonFactHandler = null
       var rowsNotExist = true
       while (rows.hasNext) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 76c99f2..9de8dc9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,19 +18,21 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, Util}
 
 class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     result: AlterPartitionResult[K, V],
@@ -39,76 +41,96 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     identifier: AbsoluteTableIdentifier,
     prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
 
-    var storeLocation: String = null
-    val carbonLoadModel = alterPartitionModel.carbonLoadModel
-    val segmentId = alterPartitionModel.segmentId
-    val oldPartitionIds = alterPartitionModel.oldPartitionIds
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val databaseName = carbonTable.getDatabaseName
-    val factTableName = carbonTable.getTableName
-    val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+  var storeLocation: String = null
+  val carbonLoadModel = alterPartitionModel.carbonLoadModel
+  val segmentId = alterPartitionModel.segmentId
+  val oldPartitionIds = alterPartitionModel.oldPartitionIds
+  val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+  val databaseName = carbonTable.getDatabaseName
+  val factTableName = carbonTable.getTableName
+  val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
-    override protected def getPartitions: Array[Partition] = {
-        val sc = alterPartitionModel.sqlContext.sparkContext
-        sc.setLocalProperty("spark.scheduler.pool", "DDL")
-        sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-        firstParent[Array[AnyRef]].partitions
-    }
-
-    override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-        val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
-        val iter = new Iterator[(K, V)] {
-            val partitionId = partitionInfo.getPartitionId(split.index)
-            carbonLoadModel.setTaskNo(String.valueOf(partitionId))
-            carbonLoadModel.setSegmentId(segmentId)
-            carbonLoadModel.setPartitionId("0")
-            CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+  override protected def getPartitions: Array[Partition] = {
+    val sc = alterPartitionModel.sqlContext.sparkContext
+    sc.setLocalProperty("spark.scheduler.pool", "DDL")
+    sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+    firstParent[Array[AnyRef]].partitions
+  }
 
-            val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-                factTableName,
-                carbonLoadModel.getTaskNo,
-                "0",
-                segmentId,
-                false,
-                true
-            )
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
+    val iter = new Iterator[(K, V)] {
+      val partitionId = partitionInfo.getPartitionId(split.index)
+      carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+      carbonLoadModel.setSegmentId(segmentId)
+      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+      val tempLocationKey = CarbonDataProcessorUtil
+        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName,
+          segmentId,
+          carbonLoadModel.getTaskNo,
+          false,
+          true)
+      // this property is used to determine whether temp location for carbon is inside
+      // container temp dir or is yarn application directory.
+      val carbonUseLocalDir = CarbonProperties.getInstance()
+        .getProperty("carbon.use.local.dir", "false")
 
-            val loadStatus = if (rows.isEmpty) {
-                LOGGER.info("After repartition this split, NO target rows to write back.")
-                true
-            } else {
-                val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
-                    segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
-                val processor = new RowResultProcessor(
-                    carbonTable,
-                    carbonLoadModel,
-                    segmentProperties,
-                    tempStoreLoc,
-                    bucketId)
-                try {
-                    processor.execute(rows)
-                } catch {
-                    case e: Exception =>
-                        sys.error(s"Exception when executing Row result processor ${e.getMessage}")
-                } finally {
-                    TableProcessingOperations
-                      .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
-                }
-            }
+      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
 
-            val loadResult = segmentId
-            var finished = false
+        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != storeLocations && storeLocations.nonEmpty) {
+          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+      } else {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
+      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
 
-            override def hasNext: Boolean = {
-                !finished
-            }
+      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
 
-            override def next(): (K, V) = {
-                finished = true
-                result.getKey(loadResult, loadStatus)
-            }
+      val loadStatus = if (rows.isEmpty) {
+        LOGGER.info("After repartition this split, NO target rows to write back.")
+        true
+      } else {
+        val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
+          segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
+        val processor = new RowResultProcessor(
+          carbonTable,
+          carbonLoadModel,
+          segmentProperties,
+          tempStoreLoc,
+          bucketId)
+        try {
+          processor.execute(rows)
+        } catch {
+          case e: Exception =>
+            sys.error(s"Exception when executing Row result processor ${ e.getMessage }")
+        } finally {
+          TableProcessingOperations
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
         }
-        iter
+      }
+
+      val loadResult = segmentId
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(loadResult, loadStatus)
+      }
     }
+    iter
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index e0dcffd..ab3ab5d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -189,16 +189,9 @@ class CarbonMergerRDD[K, V](
             }
         }
 
-        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-          factTableName,
-          carbonLoadModel.getTaskNo,
-          "0",
-          mergeNumber,
-          true,
-          false
-        )
+        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+          databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false)
 
-        carbonLoadModel.setPartitionId("0")
         var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
           LOGGER.info("CompactionResultSortProcessor flow is selected")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 72d0484..1fa1689 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -129,7 +130,8 @@ class SparkPartitionLoader(model: CarbonLoadModel,
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(
+      CarbonTablePath.DEPRECATED_PATITION_ID)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
     CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
@@ -219,14 +221,13 @@ class NewCarbonDataLoadRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       var model: CarbonLoadModel = _
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
 
         val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -264,7 +265,7 @@ class NewCarbonDataLoadRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
 
@@ -287,8 +288,8 @@ class NewCarbonDataLoadRDD[K, V](
         val fileList: java.util.List[String] = new java.util.ArrayList[String](
             CarbonCommonConstants.CONSTANT_SIZE_TEN)
         CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
-        model = carbonLoadModel.getCopyWithPartition(partitionID, fileList,
-            carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+        model = carbonLoadModel.getCopyWithPartition(
+          carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
         StandardLogService.setThreadName(StandardLogService
           .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
           , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
@@ -351,7 +352,6 @@ class NewDataFrameLoaderRDD[K, V](
 
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
@@ -359,9 +359,8 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         carbonLoadModel.setPreFetch(false)
 
@@ -406,7 +405,7 @@ class NewDataFrameLoaderRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
       var finished = false
@@ -542,7 +541,6 @@ class PartitionTableDataLoaderRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
@@ -552,9 +550,8 @@ class PartitionTableDataLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
         val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
@@ -590,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 0498b25..3c871db 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -190,8 +190,9 @@ object PartitionUtils {
         val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
         val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
         val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
-        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0",
-          segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version)
+        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(
+          String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
+          timestamp, version)
         // indexFilePath could be duplicated when multiple data file related to one index file
         if (indexFilePath != null && !pathList.contains(indexFilePath)) {
           pathList.add(indexFilePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 1695a13..09484c4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -594,7 +594,6 @@ object CarbonDataRDDFactory {
 
         override def getPartition(key: Any): Int = {
           val segId = key.asInstanceOf[String]
-          // partitionId
           segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
         }
       }
@@ -628,7 +627,6 @@ object CarbonDataRDDFactory {
     val rddResult = new updateResultImpl()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails
       val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
       var uniqueLoadStatusId = ""
@@ -639,10 +637,9 @@ object CarbonDataRDDFactory {
                              CarbonCommonConstants.UNDERSCORE +
                              (index + "_0")
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setLoadName(segId)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(segId)
         carbonLoadModel.setTaskNo(String.valueOf(index))
         carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index bff65be..b4d3bea 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -110,7 +110,6 @@ with Serializable {
       model,
       conf
     )
-    model.setPartitionId("0")
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index b5325ef..aadee81 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -858,7 +858,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 102df39..9da7244 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -173,6 +173,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       case s: ShuffleExchange => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    sql("DROP TABLE bucketed_parquet_table")
   }
 
   test("test create table with bucket join of carbon table and non bucket parquet table") {
@@ -197,6 +198,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       case s: ShuffleExchange => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
+    sql("DROP TABLE parquet_table")
   }
 
   test("test scalar subquery with equal") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index b7270b9..895fb79 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -40,8 +40,6 @@ public class CarbonDataLoadConfiguration {
 
   private String[] header;
 
-  private String partitionId;
-
   private String segmentId;
 
   private String taskNo;
@@ -194,14 +192,6 @@ public class CarbonDataLoadConfiguration {
     this.tableIdentifier = tableIdentifier;
   }
 
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
   public String getSegmentId() {
     return segmentId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f5b29e7..fc2796a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -182,7 +182,6 @@ public final class DataLoadProcessBuilder {
     configuration.setTableIdentifier(identifier);
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
-    configuration.setPartitionId(loadModel.getPartitionId());
     configuration.setSegmentId(loadModel.getSegmentId());
     configuration.setTaskNo(loadModel.getTaskNo());
     configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index e2be79c..a8db6c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -63,8 +63,7 @@ public class TableProcessingOperations {
 
     //delete folder which metadata no exist in tablestatus
     for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      final String partitionCount = i + "";
-      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+      String partitionPath = carbonTablePath.getPartitionDir();
       FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
       if (FileFactory.isFileExist(partitionPath, fileType)) {
         CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 6432d38..fcc88b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -73,8 +73,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     String[] storeLocations =
         CarbonDataProcessorUtil.getLocalDataFolderLocation(
             sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
-            sortParameters.getSegmentId() + "", false, false);
+            String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+            false, false);
     // Set the data file location
     String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index c7030dd..b7452a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -133,10 +133,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   }
 
   private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        sortParameters.getDatabaseName(), sortParameters.getTableName(),
+        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+        false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -181,10 +181,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   }
 
   private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(),
-            parameters.getPartitionID(), parameters.getSegmentId(), false, false);
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+        parameters.getSegmentId(), false, false);
     String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(tmpLocs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c5579d9..ed3a55d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -219,10 +219,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
     }
 
     private void setTempLocation(SortParameters parameters) {
-      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-          .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
-            parameters.getSegmentId(), false, false);
+      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+          parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+          parameters.getSegmentId(), false, false);
       String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
           File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
       parameters.setTempFileLocation(tempDirs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index 3c48e4d..f605b22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -119,18 +119,17 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
     Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
     for (int i = 0; i < sortDataRows.length; i++) {
-      batchIterator[i] =
-          new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+      batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]);
     }
 
     return batchIterator;
   }
 
-  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
+  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() {
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        sortParameters.getDatabaseName(), sortParameters.getTableName(),
+        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+        false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -173,7 +172,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
   private void setTempLocation(SortParameters parameters) {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+            parameters.getTaskNo(), parameters.getSegmentId(),
             false, false);
     String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -224,7 +223,6 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
   private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
 
-    private String partitionId;
 
     private int batchSize;
 
@@ -232,9 +230,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
     private UnsafeIntermediateMerger intermediateMerger;
 
-    public MergedDataIterator(String partitionId, int batchSize,
+    public MergedDataIterator(int batchSize,
         UnsafeIntermediateMerger intermediateMerger) {
-      this.partitionId = partitionId;
       this.batchSize = batchSize;
       this.intermediateMerger = intermediateMerger;
       this.firstRow = true;
@@ -245,7 +242,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
     @Override public boolean hasNext() {
       if (firstRow) {
         firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
+        finalMerger = getFinalMerger();
         List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
         finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
             intermediateMerger.getMergedPages());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 8b87cfc..6cf1dcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.util.CarbonThreadFactory;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -88,11 +89,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        tableIdentifier.getDatabaseName(),
+        tableIdentifier.getTableName(),
+        String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(),
+        false,
+        false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }
@@ -115,11 +118,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
       measureCount = configuration.getMeasureCount();
       outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1;
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+          .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
 
       if (iterators.length == 1) {
-        doExecute(iterators[0], 0, 0);
+        doExecute(iterators[0], 0);
       } else {
         executorService = Executors.newFixedThreadPool(iterators.length,
             new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier()
@@ -150,11 +153,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     return null;
   }
 
-  private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex) {
-    String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
-    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-        .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId,
-            iteratorIndex);
+  private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) {
+    String[] storeLocation = getStoreLocation(tableIdentifier);
+    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
+        configuration, storeLocation, 0, iteratorIndex);
     CarbonFactHandler dataHandler = null;
     boolean rowsNotExist = true;
     while (iterator.hasNext()) {
@@ -189,10 +191,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+        .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+        .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+            System.currentTimeMillis());
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
@@ -298,7 +301,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     }
 
     @Override public void run() {
-      doExecute(this.iterator, 0, iteratorIndex);
+      doExecute(this.iterator, iteratorIndex);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2e28c156/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index f030d52..369c1f2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep;
 import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.loading.DataField;
@@ -59,13 +60,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
-    CarbonDataProcessorUtil.createLocations(storeLocation);
-    return storeLocation;
+  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+    return CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
+        String.valueOf(configuration.getTaskNo()),
+        configuration.getSegmentId(), false, false);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -75,18 +74,19 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     String tableName = tableIdentifier.getTableName();
     try {
       CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+          .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
       int i = 0;
+      String[] storeLocation = getStoreLocation(tableIdentifier);
+      CarbonDataProcessorUtil.createLocations(storeLocation);
       for (Iterator<CarbonRowBatch> iterator : iterators) {
-        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
         int k = 0;
         while (iterator.hasNext()) {
           CarbonRowBatch next = iterator.next();
           // If no rows from merge sorter, then don't create a file in fact column handler
           if (next.hasNext()) {
             CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-                .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++);
+                .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++);
             CarbonFactHandler dataHandler = CarbonFactHandlerFactory
                 .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
             dataHandler.initialise();
@@ -119,10 +119,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
+        .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
+        .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
+            System.currentTimeMillis());
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) {