You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/09 18:21:57 UTC

[09/36] carbondata git commit: [CARBONDATA-1992] Remove partitionId in CarbonTablePath

[CARBONDATA-1992] Remove partitionId in CarbonTablePath

In CarbonTablePath, there is a deprecated partition id which is always 0, it should be removed to avoid confusion.

This closes #1765


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d3cbb026
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d3cbb026
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d3cbb026

Branch: refs/heads/carbonstore-rebase
Commit: d3cbb026cc8ea66e8ec49beb83b7604575714c78
Parents: da549c2
Author: Jacky Li <ja...@qq.com>
Authored: Sat Jan 6 20:28:44 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Sat Feb 10 02:20:09 2018 +0800

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    |   2 +-
 .../core/mutate/CarbonUpdateUtil.java           |   8 +-
 .../core/statusmanager/LoadMetadataDetails.java |   2 +
 .../SegmentUpdateStatusManager.java             |   8 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   6 +-
 .../core/util/path/CarbonTablePath.java         |  55 ++++---
 .../CarbonFormatDirectoryStructureTest.java     |   4 +-
 .../hadoop/api/CarbonTableInputFormat.java      |   2 +-
 .../streaming/CarbonStreamRecordWriter.java     |   2 +-
 .../hadoop/test/util/StoreCreator.java          |   1 -
 .../presto/util/CarbonDataStoreCreator.scala    |   1 -
 .../dataload/TestLoadDataGeneral.scala          |   2 +-
 .../InsertIntoCarbonTableTestCase.scala         |   4 +-
 .../dataload/TestBatchSortDataLoad.scala        |   3 +-
 .../dataload/TestDataLoadWithFileName.scala     |   2 +-
 .../dataload/TestGlobalSortDataLoad.scala       |   4 +-
 .../testsuite/datamap/TestDataMapCommand.scala  |  34 ++--
 .../TestDataLoadingForPartitionTable.scala      |   3 +-
 .../StandardPartitionTableCleanTestCase.scala   |   2 +-
 ...andardPartitionTableCompactionTestCase.scala |   2 +-
 .../StandardPartitionTableLoadingTestCase.scala |   4 +-
 .../load/DataLoadProcessBuilderOnSpark.scala    |   1 -
 .../load/DataLoadProcessorStepOnSpark.scala     |   2 +-
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  | 154 +++++++++++--------
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  11 +-
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  25 ++-
 .../org/apache/spark/util/PartitionUtils.scala  |   5 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |   5 +-
 .../datasources/CarbonFileFormat.scala          |   1 -
 .../partition/TestAlterPartitionTable.scala     |   2 +-
 .../bucketing/TableBucketingTestCase.scala      |   2 +
 .../loading/CarbonDataLoadConfiguration.java    |  10 --
 .../loading/DataLoadProcessBuilder.java         |   1 -
 .../loading/TableProcessingOperations.java      |   3 +-
 .../loading/model/CarbonLoadModel.java          |  72 +--------
 .../sort/impl/ParallelReadMergeSorterImpl.java  |   4 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  15 +-
 .../UnsafeBatchParallelReadMergeSorterImpl.java |   7 +-
 ...arallelReadMergeSorterWithBucketingImpl.java |  21 ++-
 .../CarbonRowDataWriterProcessorStepImpl.java   |  33 ++--
 .../steps/DataWriterBatchProcessorStepImpl.java |  25 +--
 .../steps/DataWriterProcessorStepImpl.java      |  22 +--
 .../processing/merger/CarbonDataMergerUtil.java |   6 +-
 .../merger/CompactionResultSortProcessor.java   |   4 +-
 .../sort/sortdata/SortParameters.java           |  16 +-
 .../store/CarbonFactDataHandlerModel.java       |   7 +-
 .../util/CarbonDataProcessorUtil.java           |  12 +-
 .../processing/util/CarbonLoaderUtil.java       |  12 +-
 .../processing/util/DeleteLoadFolders.java      |   7 +-
 .../carbondata/processing/StoreCreator.java     |   1 -
 .../carbondata/streaming/StreamHandoffRDD.scala |   1 -
 .../streaming/StreamSinkFactory.scala           |   2 +-
 .../streaming/CarbonAppendableStreamSink.scala  |   8 +-
 53 files changed, 285 insertions(+), 363 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 1e9cbc4..43310fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -363,7 +363,7 @@ public class PartitionMapFileStore {
         List<String> toBeDeletedIndexFiles = new ArrayList<>();
         List<String> toBeDeletedDataFiles = new ArrayList<>();
         // take the list of files from this segment.
-        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName());
         String partitionFilePath = getPartitionFilePath(segmentPath);
         if (partitionFilePath != null) {
           PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index c5f61c2..e0b208f 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -297,9 +297,7 @@ public class CarbonUpdateUtil {
     CarbonTablePath carbonTablePath = CarbonStorePath
             .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
                     absoluteTableIdentifier.getCarbonTableIdentifier());
-    // as of now considering only partition 0.
-    String partitionId = "0";
-    String partitionDir = carbonTablePath.getPartitionDir(partitionId);
+    String partitionDir = carbonTablePath.getPartitionDir();
     CarbonFile file =
             FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir));
     if (!file.exists()) {
@@ -380,7 +378,7 @@ public class CarbonUpdateUtil {
   }
 
   public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) {
-    String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentDirPath = tablePath.getCarbonDataDirectoryPath(segmentId);
 
     // scan all the carbondata files and get the latest task ID.
     CarbonFile segment =
@@ -445,7 +443,7 @@ public class CarbonUpdateUtil {
               || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
 
         // take the list of files from this segment.
-        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName());
         CarbonFile segDir =
                 FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
         CarbonFile[] allSegmentFiles = segDir.listFiles();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
index 85602bc..73a665d 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java
@@ -123,10 +123,12 @@ public class LoadMetadataDetails implements Serializable {
    */
   private FileFormat fileFormat = FileFormat.COLUMNAR_V3;
 
+  @Deprecated
   public String getPartitionCount() {
     return partitionCount;
   }
 
+  @Deprecated
   public void setPartitionCount(String partitionCount) {
     this.partitionCount = partitionCount;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index e0e7b70..d4ef5c6 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -197,7 +197,7 @@ public class SegmentUpdateStatusManager {
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     String endTimeStamp = "";
     String startTimeStamp = "";
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
     for (LoadMetadataDetails eachSeg : segmentDetails) {
@@ -292,7 +292,7 @@ public class SegmentUpdateStatusManager {
           .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
               absoluteTableIdentifier.getCarbonTableIdentifier());
       String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
-      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+      String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segment);
       String completeBlockName = CarbonTablePath.addDataPartPrefix(
           CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID)
               + CarbonCommonConstants.FACT_FILE_EXT);
@@ -424,7 +424,7 @@ public class SegmentUpdateStatusManager {
         .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
             absoluteTableIdentifier.getCarbonTableIdentifier());
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
 
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
@@ -867,7 +867,7 @@ public class SegmentUpdateStatusManager {
 
     // filter out the fact files.
 
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index c208154..83e7d52 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -1103,7 +1103,7 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+        .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
             bucketNumber, CarbonTablePath.DataFileUtil
                 .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
             tableBlockInfoList.get(0).getVersion());
@@ -1346,7 +1346,7 @@ public final class CarbonUtil {
     // geting the index file path
     //TODO need to pass proper partition number when partiton will be supported
     String carbonIndexFilePath = carbonTablePath
-        .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(),
+        .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(),
             bucketNumber, CarbonTablePath.DataFileUtil
                 .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()),
             tableBlockInfoList.get(0).getVersion());
@@ -2303,7 +2303,7 @@ public final class CarbonUtil {
     long carbonDataSize = 0L;
     long carbonIndexSize = 0L;
     HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>();
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId);
     FileFactory.FileType fileType = FileFactory.getFileType(segmentPath);
     switch (fileType) {
       case HDFS:

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 5a63d2f..e107317 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -278,15 +278,14 @@ public class CarbonTablePath extends Path {
   /**
    * Gets absolute path of data file
    *
-   * @param partitionId         unique partition identifier
    * @param segmentId           unique partition identifier
    * @param filePartNo          data file part number
    * @param factUpdateTimeStamp unique identifier to identify an update
    * @return absolute path of data file stored in carbon data format
    */
-  public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo,
-      Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) {
-    return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName(
+  public String getCarbonDataFilePath(String segmentId, Integer filePartNo, Long taskNo,
+      int batchNo, int bucketNumber, String factUpdateTimeStamp) {
+    return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName(
         filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp);
   }
 
@@ -295,13 +294,12 @@ public class CarbonTablePath extends Path {
    * based on task id
    *
    * @param taskId      task id of the file
-   * @param partitionId partition number
    * @param segmentId   segment number
    * @return full qualified carbon index path
    */
-  public String getCarbonIndexFilePath(final String taskId, final String partitionId,
-      final String segmentId, final String bucketNumber) {
-    String segmentDir = getSegmentDir(partitionId, segmentId);
+  public String getCarbonIndexFilePath(final String taskId, final String segmentId,
+      final String bucketNumber) {
+    String segmentDir = getSegmentDir(segmentId);
     CarbonFile carbonFile =
         FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
 
@@ -317,9 +315,8 @@ public class CarbonTablePath extends Path {
     if (files.length > 0) {
       return files[0].getAbsolutePath();
     } else {
-      throw new RuntimeException("Missing Carbon index file for partition["
-          + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
-          + "]");
+      throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], "
+          + "taskId[" + taskId + "]");
     }
   }
 
@@ -327,8 +324,6 @@ public class CarbonTablePath extends Path {
    * Below method will be used to get the carbon index file path
    * @param taskId
    *        task id
-   * @param partitionId
-   *        partition id
    * @param segmentId
    *        segment id
    * @param bucketNumber
@@ -337,28 +332,27 @@ public class CarbonTablePath extends Path {
    *        timestamp
    * @return carbon index file path
    */
-  public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
-      String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
+  public String getCarbonIndexFilePath(String taskId, String segmentId, String bucketNumber,
+      String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
       default:
-        String segmentDir = getSegmentDir(partitionId, segmentId);
+        String segmentDir = getSegmentDir(segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(taskId,
             Integer.parseInt(bucketNumber), timeStamp);
     }
   }
 
-  public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId,
-      int batchNo, String bucketNumber, String timeStamp,
-      ColumnarFormatVersion columnarFormatVersion) {
+  public String getCarbonIndexFilePath(String taskId, String segmentId, int batchNo,
+      String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) {
     switch (columnarFormatVersion) {
       case V1:
       case V2:
-        return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber);
+        return getCarbonIndexFilePath(taskId, segmentId, bucketNumber);
       default:
-        String segmentDir = getSegmentDir(partitionId, segmentId);
+        String segmentDir = getSegmentDir(segmentId);
         return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId),
             Integer.parseInt(bucketNumber), batchNo, timeStamp);
     }
@@ -375,12 +369,11 @@ public class CarbonTablePath extends Path {
   /**
    * Gets absolute path of data file
    *
-   * @param partitionId unique partition identifier
    * @param segmentId   unique partition identifier
    * @return absolute path of data file stored in carbon data format
    */
-  public String getCarbonDataDirectoryPath(String partitionId, String segmentId) {
-    return getSegmentDir(partitionId, segmentId);
+  public String getCarbonDataDirectoryPath(String segmentId) {
+    return getSegmentDir(segmentId);
   }
 
   /**
@@ -418,12 +411,16 @@ public class CarbonTablePath extends Path {
     return segmentDir + File.separator + getCarbonStreamIndexFileName();
   }
 
-  public String getSegmentDir(String partitionId, String segmentId) {
-    return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
+  public String getSegmentDir(String segmentId) {
+    return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId;
   }
 
-  public String getPartitionDir(String partitionId) {
-    return getFactDir() + File.separator + PARTITION_PREFIX + partitionId;
+  // This partition is not used in any code logic, just keep backward compatibility
+  public static final String DEPRECATED_PATITION_ID = "0";
+
+  public String getPartitionDir() {
+    return getFactDir() + File.separator + PARTITION_PREFIX +
+        CarbonTablePath.DEPRECATED_PATITION_ID;
   }
 
   private String getMetaDataDir() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 5549806..a1ccab3 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest {
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta"));
     assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/")
         .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex"));
-    assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L,  0, 0, "999").replace("\\", "/")
-        .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata"));
+    assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L,  0, 0, "999").replace("\\", "/")
+        .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata"));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a1887f0..6f1e123 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -480,7 +480,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> {
       long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
       long maxSize = getMaxSplitSize(job);
       for (String segmentId : streamSegments) {
-        String segmentDir = tablePath.getSegmentDir("0", segmentId);
+        String segmentDir = tablePath.getSegmentDir(segmentId);
         FileFactory.FileType fileType = FileFactory.getFileType(segmentDir);
         if (FileFactory.isFileExist(segmentDir, fileType)) {
           String indexName = CarbonTablePath.getCarbonStreamIndexFileName();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
index 364a6a6..3ef8afc 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java
@@ -129,7 +129,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> {
 
     CarbonTablePath tablePath =
         CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier());
-    segmentDir = tablePath.getSegmentDir("0", segmentId);
+    segmentDir = tablePath.getSegmentDir(segmentId);
     fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index fbf33d6..ac17c4e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -154,7 +154,6 @@ public class StoreCreator {
     loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(","));
     loadModel.setTaskNo("0");
     loadModel.setSegmentId("0");
-    loadModel.setPartitionId("0");
     loadModel.setFactTimeStamp(System.currentTimeMillis());
     loadModel.setMaxColumns("10");
     return loadModel;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 7b5c311..a41e738 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -135,7 +135,6 @@ object CarbonDataStoreCreator {
       loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(","))
       loadModel.setTaskNo("0")
       loadModel.setSegmentId("0")
-      loadModel.setPartitionId("0")
       loadModel.setFactTimeStamp(System.currentTimeMillis())
       loadModel.setMaxColumns("15")
       executeGraph(loadModel, storePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
index 09ca9e5..c84ae6b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala
@@ -49,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach {
       tableName: String): Boolean = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName)
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath)
     val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType)
     val segments: ArrayBuffer[String] = ArrayBuffer()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index d59f0b5..5cc4156 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -232,7 +232,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite")
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val folder = new File(partitionPath)
     assert(folder.isDirectory)
     assert(folder.list().length == 1)
@@ -255,7 +255,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite"))
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite")
     val partitionPath = CarbonStorePath
-      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0")
+      .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir
     val folder = new File(partitionPath)
 
     assert(folder.isDirectory)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
index 4af9d54..42ac4df 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala
@@ -193,9 +193,8 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll {
       CarbonCommonConstants.DATABASE_DEFAULT_NAME,
       tableName
     )
-    val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
index dae0962..db0a62c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala
@@ -49,7 +49,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll {
     val indexReader = new CarbonIndexFileReader()
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0")
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0")
     val carbonIndexPaths = new File(segmentDir)
       .listFiles(new FilenameFilter {
         override def accept(dir: File, name: String): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
index 0d9e0fd..479db50 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala
@@ -272,7 +272,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort")
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort")
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
+    val segmentDir = carbonTablePath.getSegmentDir("0")
     assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length)
   }
 
@@ -379,7 +379,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo
   private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo)
+    val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo)
     new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size()
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
index 146ad62..5170c43 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala
@@ -217,19 +217,27 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test if preaggregate load is successfull for hivemetastore") {
-    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
-    sql("DROP TABLE IF EXISTS maintable")
-    sql(
-      """
-        | CREATE TABLE maintable(id int, name string, city string, age int)
-        | STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-    sql(
-      s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
-        .stripMargin)
-    sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
-    checkAnswer(sql(s"select * from maintable_preagg_sum"),
-      Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    try {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true")
+      sql("DROP TABLE IF EXISTS maintable")
+      sql(
+        """
+          | CREATE TABLE maintable(id int, name string, city string, age int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(
+        s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id"""
+
+          .stripMargin)
+      sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable")
+      checkAnswer(sql(s"select * from maintable_preagg_sum"),
+        Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55)))
+    } finally {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE,
+          CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT)
+    }
   }
 
   test("test preaggregate load for decimal column for hivemetastore") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
index ed151bd..0a21aed 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -63,7 +63,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {
@@ -87,6 +87,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll
         |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
         |  utilization int,salary int)
         | PARTITIONED BY (empno int)
+        |
         | STORED BY 'org.apache.carbondata.format'
         | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
       """.stripMargin)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index 2b0dd09..5427981 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -53,7 +53,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 22ebd80..f4b6e0e 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -53,7 +53,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 669d6e7..eb091f3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -70,7 +70,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {
@@ -335,7 +335,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0")
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0")
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val files = carbonFile.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
index 8be70a9..a5c1313 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala
@@ -56,7 +56,6 @@ object DataLoadProcessBuilderOnSpark {
         .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount))
     }
 
-    model.setPartitionId("0")
     val sc = sparkSession.sparkContext
     val modelBroadcast = sc.broadcast(model)
     val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 21de003..834c1a6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -194,7 +194,7 @@ object DataLoadProcessorStepOnSpark {
 
       dataWriter = new DataWriterProcessorStepImpl(conf)
 
-      val dataHandlerModel = dataWriter.getDataHandlerModel(0)
+      val dataHandlerModel = dataWriter.getDataHandlerModel
       var dataHandler: CarbonFactHandler = null
       var rowsNotExist = true
       while (rows.hasNext) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index 76c99f2..9de8dc9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -18,19 +18,21 @@
 package org.apache.carbondata.spark.rdd
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
-import org.apache.spark.{Partition, TaskContext}
+import org.apache.spark.{Partition, SparkEnv, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.AlterPartitionModel
 import org.apache.spark.util.PartitionUtils
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.partition.spliter.RowResultProcessor
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.AlterPartitionResult
-import org.apache.carbondata.spark.util.CommonUtil
+import org.apache.carbondata.spark.util.{CommonUtil, Util}
 
 class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     result: AlterPartitionResult[K, V],
@@ -39,76 +41,96 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
     identifier: AbsoluteTableIdentifier,
     prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) {
 
-    var storeLocation: String = null
-    val carbonLoadModel = alterPartitionModel.carbonLoadModel
-    val segmentId = alterPartitionModel.segmentId
-    val oldPartitionIds = alterPartitionModel.oldPartitionIds
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val databaseName = carbonTable.getDatabaseName
-    val factTableName = carbonTable.getTableName
-    val partitionInfo = carbonTable.getPartitionInfo(factTableName)
+  var storeLocation: String = null
+  val carbonLoadModel = alterPartitionModel.carbonLoadModel
+  val segmentId = alterPartitionModel.segmentId
+  val oldPartitionIds = alterPartitionModel.oldPartitionIds
+  val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+  val databaseName = carbonTable.getDatabaseName
+  val factTableName = carbonTable.getTableName
+  val partitionInfo = carbonTable.getPartitionInfo(factTableName)
 
-    override protected def getPartitions: Array[Partition] = {
-        val sc = alterPartitionModel.sqlContext.sparkContext
-        sc.setLocalProperty("spark.scheduler.pool", "DDL")
-        sc.setLocalProperty("spark.job.interruptOnCancel", "true")
-        firstParent[Array[AnyRef]].partitions
-    }
-
-    override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
-        val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-        val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
-        val iter = new Iterator[(K, V)] {
-            val partitionId = partitionInfo.getPartitionId(split.index)
-            carbonLoadModel.setTaskNo(String.valueOf(partitionId))
-            carbonLoadModel.setSegmentId(segmentId)
-            carbonLoadModel.setPartitionId("0")
-            CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+  override protected def getPartitions: Array[Partition] = {
+    val sc = alterPartitionModel.sqlContext.sparkContext
+    sc.setLocalProperty("spark.scheduler.pool", "DDL")
+    sc.setLocalProperty("spark.job.interruptOnCancel", "true")
+    firstParent[Array[AnyRef]].partitions
+  }
 
-            val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-                factTableName,
-                carbonLoadModel.getTaskNo,
-                "0",
-                segmentId,
-                false,
-                true
-            )
+  override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava
+    val iter = new Iterator[(K, V)] {
+      val partitionId = partitionInfo.getPartitionId(split.index)
+      carbonLoadModel.setTaskNo(String.valueOf(partitionId))
+      carbonLoadModel.setSegmentId(segmentId)
+      CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true)
+      val tempLocationKey = CarbonDataProcessorUtil
+        .getTempStoreLocationKey(carbonLoadModel.getDatabaseName,
+          carbonLoadModel.getTableName,
+          segmentId,
+          carbonLoadModel.getTaskNo,
+          false,
+          true)
+      // this property is used to determine whether temp location for carbon is inside
+      // container temp dir or is yarn application directory.
+      val carbonUseLocalDir = CarbonProperties.getInstance()
+        .getProperty("carbon.use.local.dir", "false")
 
-            val loadStatus = if (rows.isEmpty) {
-                LOGGER.info("After repartition this split, NO target rows to write back.")
-                true
-            } else {
-                val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
-                    segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
-                val processor = new RowResultProcessor(
-                    carbonTable,
-                    carbonLoadModel,
-                    segmentProperties,
-                    tempStoreLoc,
-                    bucketId)
-                try {
-                    processor.execute(rows)
-                } catch {
-                    case e: Exception =>
-                        sys.error(s"Exception when executing Row result processor ${e.getMessage}")
-                } finally {
-                    TableProcessingOperations
-                      .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
-                }
-            }
+      if (carbonUseLocalDir.equalsIgnoreCase("true")) {
 
-            val loadResult = segmentId
-            var finished = false
+        val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
+        if (null != storeLocations && storeLocations.nonEmpty) {
+          storeLocation = storeLocations(Random.nextInt(storeLocations.length))
+        }
+        if (storeLocation == null) {
+          storeLocation = System.getProperty("java.io.tmpdir")
+        }
+      } else {
+        storeLocation = System.getProperty("java.io.tmpdir")
+      }
+      storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index
+      CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation)
+      LOGGER.info(s"Temp storeLocation taken is $storeLocation")
 
-            override def hasNext: Boolean = {
-                !finished
-            }
+      val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
 
-            override def next(): (K, V) = {
-                finished = true
-                result.getKey(loadResult, loadStatus)
-            }
+      val loadStatus = if (rows.isEmpty) {
+        LOGGER.info("After repartition this split, NO target rows to write back.")
+        true
+      } else {
+        val segmentProperties = PartitionUtils.getSegmentProperties(identifier,
+          segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable)
+        val processor = new RowResultProcessor(
+          carbonTable,
+          carbonLoadModel,
+          segmentProperties,
+          tempStoreLoc,
+          bucketId)
+        try {
+          processor.execute(rows)
+        } catch {
+          case e: Exception =>
+            sys.error(s"Exception when executing Row result processor ${ e.getMessage }")
+        } finally {
+          TableProcessingOperations
+            .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true)
         }
-        iter
+      }
+
+      val loadResult = segmentId
+      var finished = false
+
+      override def hasNext: Boolean = {
+        !finished
+      }
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(loadResult, loadStatus)
+      }
     }
+    iter
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0859f2e..a517bf4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -179,16 +179,9 @@ class CarbonMergerRDD[K, V](
             }
         }
 
-        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName,
-          factTableName,
-          carbonLoadModel.getTaskNo,
-          "0",
-          mergeNumber,
-          true,
-          false
-        )
+        val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+          databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false)
 
-        carbonLoadModel.setPartitionId("0")
         var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
           LOGGER.info("CompactionResultSortProcessor flow is selected")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 72d0484..1fa1689 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations}
 import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator}
 import org.apache.carbondata.processing.loading.exception.NoRetryException
@@ -129,7 +130,8 @@ class SparkPartitionLoader(model: CarbonLoadModel,
       System.setProperty("carbon.properties.filepath",
         System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties")
     }
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(
+      CarbonTablePath.DEPRECATED_PATITION_ID)
     CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
     CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
     CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
@@ -219,14 +221,13 @@ class NewCarbonDataLoadRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      var partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       var model: CarbonLoadModel = _
       val uniqueLoadStatusId =
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
 
         val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
@@ -264,7 +265,7 @@ class NewCarbonDataLoadRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
 
@@ -287,8 +288,8 @@ class NewCarbonDataLoadRDD[K, V](
         val fileList: java.util.List[String] = new java.util.ArrayList[String](
             CarbonCommonConstants.CONSTANT_SIZE_TEN)
         CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
-        model = carbonLoadModel.getCopyWithPartition(partitionID, fileList,
-            carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
+        model = carbonLoadModel.getCopyWithPartition(
+          carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
         StandardLogService.setThreadName(StandardLogService
           .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)
           , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "")
@@ -351,7 +352,6 @@ class NewDataFrameLoaderRDD[K, V](
 
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
@@ -359,9 +359,8 @@ class NewDataFrameLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
         carbonLoadModel.setPreFetch(false)
 
@@ -406,7 +405,7 @@ class NewDataFrameLoaderRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
       var finished = false
@@ -542,7 +541,6 @@ class PartitionTableDataLoaderRDD[K, V](
   override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails()
       val executionErrors = new ExecutionErrors(FailureCauses.NONE, "")
       val model: CarbonLoadModel = carbonLoadModel
@@ -552,9 +550,8 @@ class PartitionTableDataLoaderRDD[K, V](
         carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
       try {
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index)))
         carbonLoadModel.setPreFetch(false)
         val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
@@ -590,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V](
         // So print the data load statistics only in case of non failure case
         if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) {
           CarbonTimeStatisticsFactory.getLoadStatisticsInstance
-            .printStatisticsInfo(model.getPartitionId)
+            .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID)
         }
       }
       var finished = false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
index 0498b25..3c871db 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala
@@ -190,8 +190,9 @@ object PartitionUtils {
         val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo)
         val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo)
         val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path)
-        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0",
-          segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version)
+        val indexFilePath = carbonTablePath.getCarbonIndexFilePath(
+          String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber),
+          timestamp, version)
         // indexFilePath could be duplicated when multiple data file related to one index file
         if (indexFilePath != null && !pathList.contains(indexFilePath)) {
           pathList.add(indexFilePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8ed7623..3980c11 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -615,7 +615,6 @@ object CarbonDataRDDFactory {
 
         override def getPartition(key: Any): Int = {
           val segId = key.asInstanceOf[String]
-          // partitionId
           segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism)
         }
       }
@@ -649,7 +648,6 @@ object CarbonDataRDDFactory {
     val rddResult = new updateResultImpl()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] {
-      val partitionID = "0"
       val loadMetadataDetails = new LoadMetadataDetails
       val executionErrors = ExecutionErrors(FailureCauses.NONE, "")
       var uniqueLoadStatusId = ""
@@ -660,10 +658,9 @@ object CarbonDataRDDFactory {
                              CarbonCommonConstants.UNDERSCORE +
                              (index + "_0")
 
-        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID)
         loadMetadataDetails.setLoadName(segId)
         loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE)
-        carbonLoadModel.setPartitionId(partitionID)
         carbonLoadModel.setSegmentId(segId)
         carbonLoadModel.setTaskNo(String.valueOf(index))
         carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index 17749c8..0c956e5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -105,7 +105,6 @@ with Serializable {
       model,
       conf
     )
-    model.setPartitionId("0")
     model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean)
     model.setDictionaryServerHost(options.getOrElse("dicthost", null))
     model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
index b5325ef..aadee81 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala
@@ -858,7 +858,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll {
   def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = {
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId)
     val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
     val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
       override def accept(file: CarbonFile): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
index 102df39..9da7244 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala
@@ -173,6 +173,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       case s: ShuffleExchange => shuffleExists = true
     }
     assert(!shuffleExists, "shuffle should not exist on bucket tables")
+    sql("DROP TABLE bucketed_parquet_table")
   }
 
   test("test create table with bucket join of carbon table and non bucket parquet table") {
@@ -197,6 +198,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll {
       case s: ShuffleExchange => shuffleExists = true
     }
     assert(shuffleExists, "shuffle should exist on non bucket tables")
+    sql("DROP TABLE parquet_table")
   }
 
   test("test scalar subquery with equal") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 7b1ab9d..e291f41 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -40,8 +40,6 @@ public class CarbonDataLoadConfiguration {
 
   private String[] header;
 
-  private String partitionId;
-
   private String segmentId;
 
   private String taskNo;
@@ -189,14 +187,6 @@ public class CarbonDataLoadConfiguration {
     this.tableIdentifier = tableIdentifier;
   }
 
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
   public String getSegmentId() {
     return segmentId;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f7eff81..cf045a4 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -152,7 +152,6 @@ public final class DataLoadProcessBuilder {
     configuration.setTableIdentifier(identifier);
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
-    configuration.setPartitionId(loadModel.getPartitionId());
     configuration.setSegmentId(loadModel.getSegmentId());
     configuration.setTaskNo(loadModel.getTaskNo());
     configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index e2be79c..a8db6c9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -63,8 +63,7 @@ public class TableProcessingOperations {
 
     //delete folder which metadata no exist in tablestatus
     for (int i = 0; i < carbonTable.getPartitionCount(); i++) {
-      final String partitionCount = i + "";
-      String partitionPath = carbonTablePath.getPartitionDir(partitionCount);
+      String partitionPath = carbonTablePath.getPartitionDir();
       FileFactory.FileType fileType = FileFactory.getFileType(partitionPath);
       if (FileFactory.isFileExist(partitionPath, fileType)) {
         CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index d41455f..fef2da6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -39,8 +39,6 @@ public class CarbonLoadModel implements Serializable {
 
   private String colDictFilePath;
 
-  private String partitionId;
-
   private CarbonDataLoadSchema carbonDataLoadSchema;
 
   private boolean aggLoadRequest;
@@ -356,55 +354,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * get copy with partition
-   *
-   * @param uniqueId
-   * @return
-   */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId) {
-    CarbonLoadModel copy = new CarbonLoadModel();
-    copy.tableName = tableName;
-    copy.factFilePath = factFilePath + '/' + uniqueId;
-    copy.databaseName = databaseName;
-    copy.partitionId = uniqueId;
-    copy.aggLoadRequest = aggLoadRequest;
-    copy.loadMetadataDetails = loadMetadataDetails;
-    copy.isRetentionRequest = isRetentionRequest;
-    copy.complexDelimiterLevel1 = complexDelimiterLevel1;
-    copy.complexDelimiterLevel2 = complexDelimiterLevel2;
-    copy.carbonDataLoadSchema = carbonDataLoadSchema;
-    copy.blocksID = blocksID;
-    copy.taskNo = taskNo;
-    copy.factTimeStamp = factTimeStamp;
-    copy.segmentId = segmentId;
-    copy.serializationNullFormat = serializationNullFormat;
-    copy.badRecordsLoggerEnable = badRecordsLoggerEnable;
-    copy.badRecordsAction = badRecordsAction;
-    copy.escapeChar = escapeChar;
-    copy.quoteChar = quoteChar;
-    copy.commentChar = commentChar;
-    copy.timestampformat = timestampformat;
-    copy.dateFormat = dateFormat;
-    copy.defaultTimestampFormat = defaultTimestampFormat;
-    copy.maxColumns = maxColumns;
-    copy.tablePath = tablePath;
-    copy.useOnePass = useOnePass;
-    copy.dictionaryServerHost = dictionaryServerHost;
-    copy.dictionaryServerPort = dictionaryServerPort;
-    copy.dictionaryServerSecretKey = dictionaryServerSecretKey;
-    copy.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure;
-    copy.dictionaryServiceProvider = dictionaryServiceProvider;
-    copy.preFetch = preFetch;
-    copy.isEmptyDataBadRecord = isEmptyDataBadRecord;
-    copy.skipEmptyLine = skipEmptyLine;
-    copy.sortScope = sortScope;
-    copy.batchSortSizeInMb = batchSortSizeInMb;
-    copy.badRecordsLocation = badRecordsLocation;
-    copy.isAggLoadRequest = isAggLoadRequest;
-    return copy;
-  }
-
-  /**
    * Get copy with taskNo.
    * Broadcast value is shared in process, so we need to copy it to make sure the value in each
    * task independently.
@@ -416,7 +365,6 @@ public class CarbonLoadModel implements Serializable {
     copy.tableName = tableName;
     copy.factFilePath = factFilePath;
     copy.databaseName = databaseName;
-    copy.partitionId = partitionId;
     copy.aggLoadRequest = aggLoadRequest;
     copy.loadMetadataDetails = loadMetadataDetails;
     copy.isRetentionRequest = isRetentionRequest;
@@ -460,19 +408,15 @@ public class CarbonLoadModel implements Serializable {
   /**
    * get CarbonLoadModel with partition
    *
-   * @param uniqueId
-   * @param filesForPartition
    * @param header
    * @param delimiter
    * @return
    */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
-      String header, String delimiter) {
+  public CarbonLoadModel getCopyWithPartition(String header, String delimiter) {
     CarbonLoadModel copyObj = new CarbonLoadModel();
     copyObj.tableName = tableName;
     copyObj.factFilePath = null;
     copyObj.databaseName = databaseName;
-    copyObj.partitionId = uniqueId;
     copyObj.aggLoadRequest = aggLoadRequest;
     copyObj.loadMetadataDetails = loadMetadataDetails;
     copyObj.isRetentionRequest = isRetentionRequest;
@@ -514,20 +458,6 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
-   * @return the partitionId
-   */
-  public String getPartitionId() {
-    return partitionId;
-  }
-
-  /**
-   * @param partitionId the partitionId to set
-   */
-  public void setPartitionId(String partitionId) {
-    this.partitionId = partitionId;
-  }
-
-  /**
    * @param tablePath The tablePath to set.
    */
   public void setTablePath(String tablePath) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index 6432d38..fcc88b5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -73,8 +73,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
     String[] storeLocations =
         CarbonDataProcessorUtil.getLocalDataFolderLocation(
             sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(),
-            sortParameters.getSegmentId() + "", false, false);
+            String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+            false, false);
     // Set the data file location
     String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index c7030dd..b7452a7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -133,10 +133,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   }
 
   private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        sortParameters.getDatabaseName(), sortParameters.getTableName(),
+        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+        false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -181,10 +181,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte
   }
 
   private void setTempLocation(SortParameters parameters) {
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(),
-            parameters.getPartitionID(), parameters.getSegmentId(), false, false);
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+        parameters.getSegmentId(), false, false);
     String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(tmpLocs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index c5579d9..ed3a55d 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -219,10 +219,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
     }
 
     private void setTempLocation(SortParameters parameters) {
-      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-          .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(), batchCount + "",
-            parameters.getSegmentId(), false, false);
+      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+          parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
+          parameters.getSegmentId(), false, false);
       String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
           File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
       parameters.setTempFileLocation(tempDirs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
index 3c48e4d..f605b22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java
@@ -119,18 +119,17 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
     Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()];
     for (int i = 0; i < sortDataRows.length; i++) {
-      batchIterator[i] =
-          new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]);
+      batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]);
     }
 
     return batchIterator;
   }
 
-  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), bucketId,
-            sortParameters.getSegmentId() + "", false, false);
+  private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() {
+    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
+        sortParameters.getDatabaseName(), sortParameters.getTableName(),
+        String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
+        false, false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -173,7 +172,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
   private void setTempLocation(SortParameters parameters) {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
         .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(),
+            parameters.getTaskNo(), parameters.getSegmentId(),
             false, false);
     String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -224,7 +223,6 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
   private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> {
 
-    private String partitionId;
 
     private int batchSize;
 
@@ -232,9 +230,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
 
     private UnsafeIntermediateMerger intermediateMerger;
 
-    public MergedDataIterator(String partitionId, int batchSize,
+    public MergedDataIterator(int batchSize,
         UnsafeIntermediateMerger intermediateMerger) {
-      this.partitionId = partitionId;
       this.batchSize = batchSize;
       this.intermediateMerger = intermediateMerger;
       this.firstRow = true;
@@ -245,7 +242,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg
     @Override public boolean hasNext() {
       if (firstRow) {
         firstRow = false;
-        finalMerger = getFinalMerger(partitionId);
+        finalMerger = getFinalMerger();
         List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages();
         finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]),
             intermediateMerger.getMergedPages());