You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/03/02 15:58:59 UTC

[1/2] carbondata git commit: [CARBONDATA-2219] Added validation for external partition location to use same schema.

Repository: carbondata
Updated Branches:
  refs/heads/branch-1.3 660190fb5 -> 5b44e8105


[CARBONDATA-2219] Added validation for external partition location to use same schema.

This closes #2018


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/092b5d58
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/092b5d58
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/092b5d58

Branch: refs/heads/branch-1.3
Commit: 092b5d58a50498a0a66bf6166907965612eb1fc5
Parents: 660190f
Author: ravipesala <ra...@gmail.com>
Authored: Thu Mar 1 12:04:53 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 2 21:26:13 2018 +0530

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    | 13 +++--
 .../core/metadata/SegmentFileStore.java         | 27 +++++++++-
 .../examples/CarbonPartitionExample.scala       |  3 +-
 .../StandardPartitionTableQueryTestCase.scala   | 57 +++++++++++++++-----
 ...arbonAlterTableAddHivePartitionCommand.scala | 18 ++++++-
 5 files changed, 94 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index b88c1f4..4883d94 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -90,23 +90,22 @@ public class SegmentIndexFileStore {
   /**
    * Read all index files and keep the cache in it.
    *
-   * @param segmentFileStore
+   * @param segmentFile
    * @throws IOException
    */
-  public void readAllIIndexOfSegment(SegmentFileStore segmentFileStore, SegmentStatus status,
-      boolean ignoreStatus) throws IOException {
+  public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
+      SegmentStatus status, boolean ignoreStatus) throws IOException {
     List<CarbonFile> carbonIndexFiles = new ArrayList<>();
-    if (segmentFileStore.getLocationMap() == null) {
+    if (segmentFile == null) {
       return;
     }
-    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFileStore
+    for (Map.Entry<String, SegmentFileStore.FolderDetails> locations : segmentFile
         .getLocationMap().entrySet()) {
       String location = locations.getKey();
 
       if (locations.getValue().getStatus().equals(status.getMessage()) || ignoreStatus) {
         if (locations.getValue().isRelative()) {
-          location =
-              segmentFileStore.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+          location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
         }
         for (String indexFile : locations.getValue().getFiles()) {
           CarbonFile carbonFile = FileFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1902ab9..f2548b5 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
@@ -314,7 +315,7 @@ public class SegmentFileStore {
     }
     SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
     indexFilesMap = new HashMap<>();
-    indexFileStore.readAllIIndexOfSegment(this, status, ignoreStatus);
+    indexFileStore.readAllIIndexOfSegment(this.segmentFile, tablePath, status, ignoreStatus);
     Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
     DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
     for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
@@ -329,6 +330,30 @@ public class SegmentFileStore {
   }
 
   /**
+   * Reads all index files and get the schema of each index file
+   * @throws IOException
+   */
+  public static Map<String, List<ColumnSchema>> getSchemaFiles(SegmentFile segmentFile,
+      String tablePath) throws IOException {
+    Map<String, List<ColumnSchema>> schemaMap = new HashMap<>();
+    if (segmentFile == null) {
+      return schemaMap;
+    }
+    SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+    indexFileStore.readAllIIndexOfSegment(segmentFile, tablePath, SegmentStatus.SUCCESS, true);
+    Map<String, byte[]> carbonIndexMap = indexFileStore.getCarbonIndexMapWithFullPath();
+    DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+    for (Map.Entry<String, byte[]> entry : carbonIndexMap.entrySet()) {
+      List<DataFileFooter> indexInfo =
+          fileFooterConverter.getIndexInfo(entry.getKey(), entry.getValue());
+      if (indexInfo.size() > 0) {
+        schemaMap.put(entry.getKey(), indexInfo.get(0).getColumnInTable());
+      }
+    }
+    return schemaMap;
+  }
+
+  /**
    * Gets all index files from this segment
    * @return
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
index 6837c56..2391dbe 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonPartitionExample.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.SparkSession
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 object CarbonPartitionExample {
 
@@ -195,7 +196,7 @@ object CarbonPartitionExample {
     try {
       spark.sql("""SHOW PARTITIONS t1""").show(100, false)
     } catch {
-      case ex: AnalysisException => LOGGER.error(ex.getMessage())
+      case ex: ProcessMetaDataException => LOGGER.error(ex.getMessage())
     }
     spark.sql("""SHOW PARTITIONS t0""").show(100, false)
     spark.sql("""SHOW PARTITIONS t3""").show(100, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 918bbff..58eb9f9 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -290,15 +290,17 @@ test("Creation of partition table should fail if the colname in table schema and
     val location = metastoredb +"/" +"ravi"
     sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
-    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload")
+    val frame = sql("select count(empno) from staticpartitionlocload")
     verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(10)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(empno) from staticpartitionlocload"), Seq(Row(20)))
     val file = FileFactory.getCarbonFile(location)
     assert(file.exists())
     FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
-  test("add external partition with static column partition with load command") {
+  test("add external partition with static column partition with load command with diffrent schema") {
 
     sql(
       """
@@ -324,18 +326,43 @@ test("Creation of partition table should fail if the colname in table schema and
         | PARTITIONED BY (empname String)
         | STORED BY 'org.apache.carbondata.format'
       """.stripMargin)
-    sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
-    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
-    verifyPartitionInfo(frame, Seq("empname=ravi"))
-    assert(frame.count() == 10)
-    val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
-    sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""")
-    val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
-    verifyPartitionInfo(frame1, Seq("empname=indra"))
-    assert(frame1.count() == 20)
+    intercept[Exception] {
+      sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
+    }
+    assert(sql(s"show partitions staticpartitionextlocload").count() == 0)
     val file = FileFactory.getCarbonFile(location)
-    assert(file.exists())
-    FileFactory.deleteAllCarbonFilesOfDir(file)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
+  }
+
+  test("add external partition with static column partition with load command") {
+
+    sql(
+      """
+        | CREATE TABLE staticpartitionlocloadother_new (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi1"
+    sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""ALTER TABLE staticpartitionlocloadother_new DROP PARTITION(empname='ravi')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(10)))
+    sql(s"""alter table staticpartitionlocloadother_new add partition (empname='ravi') location '$location'""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(20)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother_new partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql(s"select count(deptname) from staticpartitionlocloadother_new"), Seq(Row(30)))
+    val file = FileFactory.getCarbonFile(location)
+    if(file.exists()) {
+      FileFactory.deleteAllCarbonFilesOfDir(file)
+    }
   }
 
   test("drop partition on preAggregate table should fail"){
@@ -387,6 +414,8 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists staticpartitionlocload")
     sql("drop table if exists staticpartitionextlocload")
     sql("drop table if exists staticpartitionlocloadother")
+    sql("drop table if exists staticpartitionextlocload_new")
+    sql("drop table if exists staticpartitionlocloadother_new")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/092b5d58/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 2aaecc7..b0e6b94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -75,7 +75,12 @@ case class CarbonAlterTableAddHivePartitionCommand(
 
 
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
-    AlterTableDropPartitionCommand(tableName, partitionSpecsAndLocs.map(_._1), true, false, true)
+    AlterTableDropPartitionCommand(
+      tableName,
+      partitionSpecsAndLocs.map(_._1),
+      ifExists = true,
+      purge = false,
+      retainData = true).run(sparkSession)
     val msg = s"Got exception $exception when processing data of add partition." +
               "Dropping partitions to the metadata"
     LogServiceFactory.getLogService(this.getClass.getCanonicalName).error(msg)
@@ -88,6 +93,17 @@ case class CarbonAlterTableAddHivePartitionCommand(
       val segmentFile = SegmentFileStore.getSegmentFileForPhysicalDataPartitions(table.getTablePath,
         partitionSpecsAndLocsTobeAdded)
       if (segmentFile != null) {
+        val indexToSchemas = SegmentFileStore.getSchemaFiles(segmentFile, table.getTablePath)
+        val tableColums = table.getTableInfo.getFactTable.getListOfColumns.asScala
+        var isSameSchema = indexToSchemas.asScala.exists{ case(key, columnSchemas) =>
+          columnSchemas.asScala.exists { col =>
+            tableColums.exists(p => p.getColumnUniqueId.equals(col.getColumnUniqueId))
+          } && columnSchemas.size() == tableColums.length
+        }
+        if (!isSameSchema) {
+          throw new UnsupportedOperationException(
+            "Schema of index files located in location is not matching with current table schema")
+        }
         val loadModel = new CarbonLoadModel
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file


[2/2] carbondata git commit: [CARBONDATA-2209] Fixed rename table with partitions not working issue and batch_sort and no_sort with partition table issue

Posted by gv...@apache.org.
[CARBONDATA-2209] Fixed rename table with partitions not working issue and batch_sort and no_sort with partition table issue

This closes #2006


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5b44e810
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5b44e810
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5b44e810

Branch: refs/heads/branch-1.3
Commit: 5b44e8105cc10ea9616323bbe3736619729658ae
Parents: 092b5d5
Author: ravipesala <ra...@gmail.com>
Authored: Tue Feb 27 16:38:09 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Mar 2 21:26:30 2018 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/datamap/Segment.java |  21 ++
 .../blockletindex/SegmentIndexFileStore.java    |  13 +-
 .../core/metadata/SegmentFileStore.java         |  33 +++-
 .../core/util/path/CarbonTablePath.java         |   7 +
 .../core/writer/CarbonIndexFileMergeWriter.java | 191 ++++++++++++++-----
 .../CarbonIndexFileMergeTestCase.scala          |  15 +-
 .../StandardPartitionGlobalSortTestCase.scala   |  86 +++++++++
 .../StandardPartitionTableQueryTestCase.scala   |  21 ++
 .../schema/CarbonAlterTableRenameCommand.scala  |  58 +++++-
 .../sql/execution/strategy/DDLStrategy.scala    |   8 +
 .../spark/sql/hive/CarbonSessionState.scala     |  12 +-
 .../spark/sql/hive/CarbonSessionState.scala     |  10 +
 .../loading/DataLoadProcessBuilder.java         |   8 +-
 .../processing/util/CarbonLoaderUtil.java       |  87 +++++++++
 14 files changed, 509 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
index c47f16c..a2a2a41 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/Segment.java
@@ -21,6 +21,10 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Objects;
 
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
 /**
  * Represents one load of carbondata
  */
@@ -76,6 +80,23 @@ public class Segment implements Serializable {
     return new Segment(segmentId, null);
   }
 
+  /**
+   * Read the table status and get the segment corresponding to segmentNo
+   * @param segmentNo
+   * @param tablePath
+   * @return
+   */
+  public static Segment getSegment(String segmentNo, String tablePath) {
+    LoadMetadataDetails[] loadMetadataDetails =
+        SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(tablePath));
+    for (LoadMetadataDetails details: loadMetadataDetails) {
+      if (details.getLoadName().equals(segmentNo)) {
+        return new Segment(details.getLoadName(), details.getSegmentFile());
+      }
+    }
+    return null;
+  }
+
   @Override public boolean equals(Object o) {
     if (this == o) return true;
     if (o == null || getClass() != o.getClass()) return false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 4883d94..9364a7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -96,6 +96,7 @@ public class SegmentIndexFileStore {
   public void readAllIIndexOfSegment(SegmentFileStore.SegmentFile segmentFile, String tablePath,
       SegmentStatus status, boolean ignoreStatus) throws IOException {
     List<CarbonFile> carbonIndexFiles = new ArrayList<>();
+    Set<String> indexFiles = new HashSet<>();
     if (segmentFile == null) {
       return;
     }
@@ -107,11 +108,21 @@ public class SegmentIndexFileStore {
         if (locations.getValue().isRelative()) {
           location = tablePath + CarbonCommonConstants.FILE_SEPARATOR + location;
         }
+        String mergeFileName = locations.getValue().getMergeFileName();
+        if (mergeFileName != null) {
+          CarbonFile mergeFile = FileFactory
+              .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + mergeFileName);
+          if (mergeFile.exists() && !indexFiles.contains(mergeFile.getAbsolutePath())) {
+            carbonIndexFiles.add(mergeFile);
+            indexFiles.add(mergeFile.getAbsolutePath());
+          }
+        }
         for (String indexFile : locations.getValue().getFiles()) {
           CarbonFile carbonFile = FileFactory
               .getCarbonFile(location + CarbonCommonConstants.FILE_SEPARATOR + indexFile);
-          if (carbonFile.exists()) {
+          if (carbonFile.exists() && !indexFiles.contains(carbonFile.getAbsolutePath())) {
             carbonIndexFiles.add(carbonFile);
+            indexFiles.add(carbonFile.getAbsolutePath());
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index f2548b5..2d31b4e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -195,7 +195,7 @@ public class SegmentFileStore {
    * @param partitionSpecs
    */
   public static SegmentFile getSegmentFileForPhysicalDataPartitions(String tablePath,
-      List<PartitionSpec> partitionSpecs) {
+      List<PartitionSpec> partitionSpecs) throws IOException {
     SegmentFile segmentFile = null;
     for (PartitionSpec spec : partitionSpecs) {
       String location = spec.getLocation().toString();
@@ -220,6 +220,9 @@ public class SegmentFileStore {
         folderDetails.setStatus(SegmentStatus.SUCCESS.getMessage());
         for (CarbonFile file : listFiles) {
           if (file.getName().endsWith(CarbonTablePath.MERGE_INDEX_FILE_EXT)) {
+            List<String> indexFiles =
+                new SegmentIndexFileStore().getIndexFilesFromMergeFile(file.getAbsolutePath());
+            folderDetails.getFiles().addAll(indexFiles);
             folderDetails.setMergeFileName(file.getName());
           } else {
             folderDetails.getFiles().add(file.getName());
@@ -302,6 +305,10 @@ public class SegmentFileStore {
     readIndexFiles(SegmentStatus.SUCCESS, false);
   }
 
+  public SegmentFile getSegmentFile() {
+    return segmentFile;
+  }
+
   /**
    * Reads all index files as per the status of the file. In case of @ignoreStatus is true it just
    * reads all index files
@@ -377,6 +384,30 @@ public class SegmentFileStore {
   }
 
   /**
+   * Gets all carbon index files from this segment
+   * @return
+   */
+  public List<CarbonFile> getIndexCarbonFiles() {
+    Map<String, String> indexFiles = getIndexFiles();
+    Set<String> files = new HashSet<>();
+    for (Map.Entry<String, String> entry: indexFiles.entrySet()) {
+      Path path = new Path(entry.getKey());
+      files.add(entry.getKey());
+      if (entry.getValue() != null) {
+        files.add(new Path(path.getParent(), entry.getValue()).toString());
+      }
+    }
+    List<CarbonFile> carbonFiles = new ArrayList<>();
+    for (String indexFile : files) {
+      CarbonFile carbonFile = FileFactory.getCarbonFile(indexFile);
+      if (carbonFile.exists()) {
+        carbonFiles.add(carbonFile);
+      }
+    }
+    return carbonFiles;
+  }
+
+  /**
    * Drops the partition related files from the segment file of the segment and writes
    * to a new file. First iterator over segment file and check the path it needs to be dropped.
    * And update the status with delete if it found.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index d70d9ef..f232c23 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -165,6 +165,13 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * Return metadata path based on `tablePath`
+   */
+  public static String getTableStatusPath(String tablePath) {
+    return getMetadataPath(tablePath) + File.separator + TABLE_STATUS_FILE;
+  }
+
+  /**
    * @param columnId unique column identifier
    * @return absolute path of dictionary meta file
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 01f96ba..bc150e5 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -17,18 +17,26 @@
 package org.apache.carbondata.core.writer;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
 import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.format.MergedBlockIndex;
 import org.apache.carbondata.format.MergedBlockIndexHeader;
 
+import org.apache.hadoop.fs.Path;
+
 public class CarbonIndexFileMergeWriter {
 
   /**
@@ -38,7 +46,7 @@ public class CarbonIndexFileMergeWriter {
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   * @param tablePath
    * @param indexFileNamesTobeAdded while merging it comsiders only these files.
    *                                If null then consider all
    * @param readFileFooterFromCarbonDataFile flag to read file footer information from carbondata
@@ -46,77 +54,152 @@ public class CarbonIndexFileMergeWriter {
    *                                         which do not store the blocklet info to current version
    * @throws IOException
    */
-  private void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      List<String> indexFileNamesTobeAdded, boolean readFileFooterFromCarbonDataFile)
-      throws IOException {
-    CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+  private SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+      String tablePath, List<String> indexFileNamesTobeAdded,
+      boolean readFileFooterFromCarbonDataFile) throws IOException {
+    Segment segment = Segment.getSegment(segmentId, tablePath);
+    String segmentPath = CarbonTablePath.getSegmentPath(tablePath, segmentId);
+    CarbonFile[] indexFiles;
+    SegmentFileStore sfs = null;
+    if (segment != null && segment.getSegmentFileName() != null) {
+      sfs = new SegmentFileStore(tablePath, segment.getSegmentFileName());
+      List<CarbonFile> indexCarbonFiles = sfs.getIndexCarbonFiles();
+      indexFiles = indexCarbonFiles.toArray(new CarbonFile[indexCarbonFiles.size()]);
+    } else {
+      indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
+    }
     if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
-      SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
-      if (readFileFooterFromCarbonDataFile) {
-        // this case will be used in case of upgrade where old store will not have the blocklet
-        // info in the index file and therefore blocklet info need to be read from the file footer
-        // in the carbondata file
-        fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+      if (sfs == null) {
+        return mergeNormalSegment(indexFileNamesTobeAdded, readFileFooterFromCarbonDataFile,
+            segmentPath, indexFiles);
       } else {
-        fileStore.readAllIIndexOfSegment(segmentPath);
+        return mergePartitionSegment(indexFileNamesTobeAdded, sfs, indexFiles);
       }
-      Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
-      MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
-      MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
-      List<String> fileNames = new ArrayList<>(indexMap.size());
-      List<ByteBuffer> data = new ArrayList<>(indexMap.size());
-      for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
-        if (indexFileNamesTobeAdded == null ||
-            indexFileNamesTobeAdded.contains(entry.getKey())) {
-          fileNames.add(entry.getKey());
-          data.add(ByteBuffer.wrap(entry.getValue()));
-        }
+    }
+    return null;
+  }
+
+
+  private SegmentIndexFIleMergeStatus mergeNormalSegment(List<String> indexFileNamesTobeAdded,
+      boolean readFileFooterFromCarbonDataFile, String segmentPath, CarbonFile[] indexFiles)
+      throws IOException {
+    SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+    if (readFileFooterFromCarbonDataFile) {
+      // this case will be used in case of upgrade where old store will not have the blocklet
+      // info in the index file and therefore blocklet info need to be read from the file footer
+      // in the carbondata file
+      fileStore.readAllIndexAndFillBolckletInfo(segmentPath);
+    } else {
+      fileStore.readAllIIndexOfSegment(segmentPath);
+    }
+    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
+    writeMergeIndexFile(indexFileNamesTobeAdded, segmentPath, indexMap);
+    for (CarbonFile indexFile : indexFiles) {
+      indexFile.delete();
+    }
+    return null;
+  }
+
+  private SegmentIndexFIleMergeStatus mergePartitionSegment(List<String> indexFileNamesTobeAdded,
+      SegmentFileStore sfs, CarbonFile[] indexFiles) throws IOException {
+    SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
+    fileStore
+        .readAllIIndexOfSegment(sfs.getSegmentFile(), sfs.getTablePath(), SegmentStatus.SUCCESS,
+            true);
+    Map<String, byte[]> indexMap = fileStore.getCarbonIndexMapWithFullPath();
+    Map<String, Map<String, byte[]>> indexLocationMap = new HashMap<>();
+    for (Map.Entry<String, byte[]> entry: indexMap.entrySet()) {
+      Path path = new Path(entry.getKey());
+      Map<String, byte[]> map = indexLocationMap.get(path.getParent().toString());
+      if (map == null) {
+        map = new HashMap<>();
+        indexLocationMap.put(path.getParent().toString(), map);
       }
-      if (fileNames.size() > 0) {
-        openThriftWriter(
-            segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
-        indexHeader.setFile_names(fileNames);
-        mergedBlockIndex.setFileData(data);
-        writeMergedBlockIndexHeader(indexHeader);
-        writeMergedBlockIndex(mergedBlockIndex);
-        close();
+      map.put(path.getName(), entry.getValue());
+    }
+    for (Map.Entry<String, Map<String, byte[]>> entry : indexLocationMap.entrySet()) {
+      String mergeIndexFile =
+          writeMergeIndexFile(indexFileNamesTobeAdded, entry.getKey(), entry.getValue());
+      for (Map.Entry<String, SegmentFileStore.FolderDetails> segentry : sfs.getLocationMap()
+          .entrySet()) {
+        String location = segentry.getKey();
+        if (segentry.getValue().isRelative()) {
+          location = sfs.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR + location;
+        }
+        if (new Path(entry.getKey()).equals(new Path(location))) {
+          segentry.getValue().setMergeFileName(mergeIndexFile);
+          break;
+        }
       }
-      for (CarbonFile indexFile : indexFiles) {
-        indexFile.delete();
+    }
+
+    List<String> filesTobeDeleted = new ArrayList<>();
+    for (CarbonFile file : indexFiles) {
+      filesTobeDeleted.add(file.getAbsolutePath());
+    }
+    return new SegmentIndexFIleMergeStatus(sfs.getSegmentFile(), filesTobeDeleted);
+  }
+
+  private String writeMergeIndexFile(List<String> indexFileNamesTobeAdded, String segmentPath,
+      Map<String, byte[]> indexMap) throws IOException {
+    MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
+    MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
+    List<String> fileNames = new ArrayList<>(indexMap.size());
+    List<ByteBuffer> data = new ArrayList<>(indexMap.size());
+    for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
+      if (indexFileNamesTobeAdded == null ||
+          indexFileNamesTobeAdded.contains(entry.getKey())) {
+        fileNames.add(entry.getKey());
+        data.add(ByteBuffer.wrap(entry.getValue()));
       }
     }
+    if (fileNames.size() > 0) {
+      String mergeIndexName = System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT;
+      openThriftWriter(segmentPath + "/" + mergeIndexName);
+      indexHeader.setFile_names(fileNames);
+      mergedBlockIndex.setFileData(data);
+      writeMergedBlockIndexHeader(indexHeader);
+      writeMergedBlockIndex(mergedBlockIndex);
+      close();
+      return mergeIndexName;
+    }
+    return null;
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
    *
-   * @param segmentPath
+   * @param segmentId
    * @param indexFileNamesTobeAdded
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      List<String> indexFileNamesTobeAdded) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, indexFileNamesTobeAdded, false);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+      String tablePath, List<String> indexFileNamesTobeAdded) throws IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, indexFileNamesTobeAdded, false);
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   *
+   * @param segmentId
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, null, false);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+      String tablePath) throws IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null, false);
   }
 
   /**
    * Merge all the carbonindex files of segment to a  merged file
-   * @param segmentPath
+   *
+   * @param segmentId
    * @param readFileFooterFromCarbonDataFile
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath,
-      boolean readFileFooterFromCarbonDataFile) throws IOException {
-    mergeCarbonIndexFilesOfSegment(segmentPath, null, readFileFooterFromCarbonDataFile);
+  public SegmentIndexFIleMergeStatus mergeCarbonIndexFilesOfSegment(String segmentId,
+      String tablePath, boolean readFileFooterFromCarbonDataFile) throws IOException {
+    return mergeCarbonIndexFilesOfSegment(segmentId, tablePath, null,
+        readFileFooterFromCarbonDataFile);
   }
 
   private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
@@ -166,4 +249,24 @@ public class CarbonIndexFileMergeWriter {
     thriftWriter.close();
   }
 
+  public static class SegmentIndexFIleMergeStatus implements Serializable {
+
+    private SegmentFileStore.SegmentFile segmentFile;
+
+    private List<String> filesTobeDeleted;
+
+    public SegmentIndexFIleMergeStatus(SegmentFileStore.SegmentFile segmentFile,
+        List<String> filesTobeDeleted) {
+      this.segmentFile = segmentFile;
+      this.filesTobeDeleted = filesTobeDeleted;
+    }
+
+    public SegmentFileStore.SegmentFile getSegmentFile() {
+      return segmentFile;
+    }
+
+    public List<String> getFilesTobeDeleted() {
+      return filesTobeDeleted;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 895b0b5..7608318 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -62,9 +62,8 @@ class CarbonIndexFileMergeTestCase
     sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE indexmerge OPTIONS('header'='false', " +
         s"'GLOBAL_SORT_PARTITIONS'='100')")
     val table = CarbonMetadata.getInstance().getCarbonTable("default","indexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     assert(getIndexFileCount("default_indexmerge", "0") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""),
       sql("""Select count(*) from indexmerge"""))
@@ -88,9 +87,9 @@ class CarbonIndexFileMergeTestCase
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
+      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -114,9 +113,9 @@ class CarbonIndexFileMergeTestCase
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0"), false)
+      .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","1"), false)
+      .mergeCarbonIndexFilesOfSegment("1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
@@ -144,7 +143,7 @@ class CarbonIndexFileMergeTestCase
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
@@ -174,7 +173,7 @@ class CarbonIndexFileMergeTestCase
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
     val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
-      .mergeCarbonIndexFilesOfSegment(carbonTablePath.getSegmentDir("0","0.1"), false)
+      .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "2") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
index ff062cd..b511ee8 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala
@@ -927,6 +927,92 @@ class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterA
     assert(exMessage.getMessage.contains("day is not a valid partition column in table default.partitionnocolumn"))
   }
 
+  test("data loading with default partition in static partition table with batchsort") {
+    sql("DROP TABLE IF EXISTS partitiondefaultbatchsort")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultbatchsort (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='BATCH_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultbatchsort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultbatchsort"), Seq(Row(10)))
+  }
+
+  test("data loading with default partition in static partition table with nosort") {
+    sql("DROP TABLE IF EXISTS partitiondefaultnosort")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultnosort (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='NO_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultnosort OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultnosort"), Seq(Row(10)))
+  }
+
+  test("data loading with default partition in static partition table with rename") {
+    sql("DROP TABLE IF EXISTS partitiondefaultrename")
+    sql("DROP TABLE IF EXISTS partitiondefaultrename_new")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultrename (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrename"), Seq(Row(10)))
+    sql(s"alter table partitiondefaultrename rename to partitiondefaultrename_new")
+    checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(10)))
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrename_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrename_new"), Seq(Row(20)))
+  }
+
+  test("data loading with default partition in static partition table with rename first") {
+    sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst")
+    sql("DROP TABLE IF EXISTS partitiondefaultrenamefirst_new")
+    sql(
+      """
+        | CREATE TABLE partitiondefaultrenamefirst (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectenddate Timestamp,attendance int,
+        |  utilization int, doj Timestamp, empname String)
+        | PARTITIONED BY (projectjoindate Timestamp, salary decimal)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"alter table partitiondefaultrenamefirst rename to partitiondefaultrenamefirst_new")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiondefaultrenamefirst_new OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(sql("select count(*) from partitiondefaultrenamefirst_new"), Seq(Row(10)))
+  }
+
+  test("data loading for global partition table for two partition column with no columns in csv") {
+    sql("DROP TABLE IF EXISTS partitiontwonocolumns")
+    sql(
+      """
+        | CREATE TABLE partitiontwonocolumns (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,doj Timestamp, empname String)
+        | PARTITIONED BY (newcol1 date, newcol2 int)
+        | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwonocolumns partition(newcol1='2016-08-09', newcol2='20') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwonocolumns order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+    checkAnswer(sql("select distinct cast(newcol1 as string) from partitiontwonocolumns"), Seq(Row("2016-08-09")))
+  }
+
   override def afterAll = {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 58eb9f9..163e662 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -300,6 +300,27 @@ test("Creation of partition table should fail if the colname in table schema and
     FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
+  test("set partition location with static column partition with load command") {
+    sql("drop table if exists staticpartitionsetloc")
+    sql(
+      """
+        | CREATE TABLE staticpartitionsetloc (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi1"
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionsetloc partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    intercept[Exception] {
+      sql(s"""alter table staticpartitionsetloc partition (empname='ravi') set location '$location'""")
+    }
+    val file = FileFactory.getCarbonFile(location)
+    FileFactory.deleteAllCarbonFilesOfDir(file)
+  }
+
   test("add external partition with static column partition with load command with diffrent schema") {
 
     sql(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index c8f64e1..40b5cfc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,8 +17,10 @@
 
 package org.apache.spark.sql.execution.command.schema
 
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
 import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog}
 import org.apache.spark.util.AlterTableUtil
@@ -119,6 +121,12 @@ private[sql] case class CarbonAlterTableRenameCommand(
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = sparkSession.sessionState.catalog.asInstanceOf[CarbonSessionCatalog]
         .getClient()
+      var partitions: Seq[CatalogTablePartition] = Seq.empty
+      if (carbonTable.isHivePartitionTable) {
+        partitions =
+          sparkSession.sessionState.catalog.listPartitions(
+            TableIdentifier(oldTableName, Some(oldDatabaseName)))
+      }
       sparkSession.catalog.refreshTable(TableIdentifier(oldTableName,
         Some(oldDatabaseName)).quotedString)
       hiveClient.runSqlHive(
@@ -127,6 +135,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
           s"ALTER TABLE $oldDatabaseName.$newTableName SET SERDEPROPERTIES" +
           s"('tableName'='$newTableName', " +
           s"'dbName'='$oldDatabaseName', 'tablePath'='$newTablePath')")
+
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
@@ -138,6 +147,27 @@ private[sql] case class CarbonAlterTableRenameCommand(
           sys.error(s"Folder rename failed for table $oldDatabaseName.$oldTableName")
         }
       }
+      val updatedParts = updatePartitionLocations(
+        partitions,
+        oldTablePath.getPath,
+        newTablePath,
+        sparkSession)
+
+      val newIdentifier = TableIdentifier(newTableName, Some(oldDatabaseName))
+      val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(newIdentifier)
+      // Update the storage location with new path
+      sparkSession.sessionState.catalog.alterTable(
+        catalogTable.copy(storage = sparkSession.sessionState.catalog.
+          asInstanceOf[CarbonSessionCatalog].updateStorageLocation(
+          new Path(newTablePath),
+          catalogTable.storage)))
+      if (updatedParts.nonEmpty) {
+        // Update the new updated partitions specs with new location.
+        sparkSession.sessionState.catalog.alterPartitions(
+          newIdentifier,
+          updatedParts)
+      }
+
       newTablePath = metastore.updateTableSchemaForAlter(newTableIdentifier,
         carbonTable.getCarbonTableIdentifier,
         tableInfo,
@@ -151,8 +181,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePostEvent, operationContext)
 
-      sparkSession.catalog.refreshTable(TableIdentifier(newTableName,
-        Some(oldDatabaseName)).quotedString)
+      sparkSession.catalog.refreshTable(newIdentifier.quotedString)
       carbonTableLockFilePath = newTablePath
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
@@ -187,6 +216,31 @@ private[sql] case class CarbonAlterTableRenameCommand(
     Seq.empty
   }
 
+  /**
+   * Update partitions with new table location
+   *
+   */
+  private def updatePartitionLocations(
+      partitions: Seq[CatalogTablePartition],
+      oldTablePath: String,
+      newTablePath: String,
+      sparkSession: SparkSession): Seq[CatalogTablePartition] = {
+    partitions.map{ part =>
+      if (part.storage.locationUri.isDefined) {
+        val path = new Path(part.location)
+        if (path.toString.contains(oldTablePath)) {
+          val newPath = new Path(path.toString.replace(oldTablePath, newTablePath))
+          part.copy(storage = sparkSession.sessionState.catalog.
+            asInstanceOf[CarbonSessionCatalog].updateStorageLocation(newPath, part.storage))
+        } else {
+          part
+        }
+      } else {
+        part
+      }
+    }
+  }
+
   private def renameBadRecords(
       oldTableName: String,
       newTableName: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index f69ccc1..dcbce84 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -265,6 +265,14 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         RefreshCarbonTableCommand(tableIdentifier.database,
           tableIdentifier.table).run(sparkSession)
         ExecutedCommandExec(RefreshTable(tableIdentifier)) :: Nil
+      case alterSetLoc@AlterTableSetLocationCommand(tableName, _, _) =>
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableName)(sparkSession)
+        if (isCarbonTable) {
+          throw new UnsupportedOperationException("Set partition location is not supported")
+        } else {
+          ExecutedCommandExec(alterSetLoc) :: Nil
+        }
       case _ => Nil
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
index 1b7f0cb..ba2fe947 100644
--- a/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -17,8 +17,9 @@
 package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTablePartition, FunctionResourceLoader, GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference, Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
@@ -182,6 +183,15 @@ class CarbonSessionCatalog(
       allPartitions
     }
   }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toString))
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
index a119bda..e82b485 100644
--- a/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/org/apache/spark/sql/hive/CarbonSessionState.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import scala.collection.generic.SeqFactory
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
@@ -171,6 +172,15 @@ class CarbonSessionCatalog(
       partitionFilters,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
   }
+
+  /**
+   * Update the storageformat with new location information
+   */
+  def updateStorageLocation(
+      path: Path,
+      storage: CatalogStorageFormat): CatalogStorageFormat = {
+    storage.copy(locationUri = Some(path.toUri))
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f5b29e7..82f4c9b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -57,15 +57,15 @@ public final class DataLoadProcessBuilder {
       CarbonIterator[] inputIterators) throws Exception {
     CarbonDataLoadConfiguration configuration = createConfiguration(loadModel, storeLocation);
     SortScopeOptions.SortScope sortScope = CarbonDataProcessorUtil.getSortScope(configuration);
-    if ((!configuration.isSortTable() || sortScope.equals(SortScopeOptions.SortScope.NO_SORT))
-        && !loadModel.isPartitionLoad()) {
+    if (loadModel.isPartitionLoad()) {
+      return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
+    } else if (!configuration.isSortTable() ||
+        sortScope.equals(SortScopeOptions.SortScope.NO_SORT)) {
       return buildInternalForNoSort(inputIterators, configuration);
     } else if (configuration.getBucketingInfo() != null) {
       return buildInternalForBucketing(inputIterators, configuration);
     } else if (sortScope.equals(SortScopeOptions.SortScope.BATCH_SORT)) {
       return buildInternalForBatchSort(inputIterators, configuration);
-    } else if (loadModel.isPartitionLoad()) {
-      return buildInternalForPartitionLoad(inputIterators, configuration, sortScope);
     } else {
       return buildInternal(inputIterators, configuration);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5b44e810/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 32c72da..23f9aa8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -48,6 +48,7 @@ import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -57,6 +58,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.merger.NodeBlockRelation;
 import org.apache.carbondata.processing.merger.NodeMultiBlockRelation;
@@ -328,6 +330,60 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
+  /**
+   * This API will update the segmentFile of a passed segment.
+   *
+   * @return boolean which determines whether status update is done or not.
+   * @throws IOException
+   */
+  public static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
+      throws IOException {
+    boolean status = false;
+    String tableStatusPath = CarbonTablePath.getTableStatusPath(tablePath);
+    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.from(tablePath, null, null);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+    int retryCount = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
+    int maxTimeout = CarbonLockUtil
+        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
+            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
+    try {
+      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
+        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+            SegmentStatusManager.readLoadMetadata(metadataPath);
+
+        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
+          // if the segments is in the list of marked for delete then update the status.
+          if (segmentId.equals(detail.getLoadName())) {
+            detail.setSegmentFile(segmentFile);
+            break;
+          }
+        }
+
+        SegmentStatusManager
+            .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+        status = true;
+      } else {
+        LOGGER.error(
+            "Not able to acquire the lock for Table status updation for table path " + tablePath);
+      }
+      ;
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + tablePath + " during table status updation");
+      }
+    }
+    return status;
+  }
+
   private static void addToStaleFolders(CarbonTablePath carbonTablePath,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException {
     String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName());
@@ -950,4 +1006,35 @@ public final class CarbonLoaderUtil {
     loadMetadataDetails.setIndexSize(String.valueOf(indexSize));
     return dataSize + indexSize;
   }
+
+  /**
+   * Merge index files with in the segment of partitioned table
+   * @param segmentId
+   * @param tablePath
+   * @param uniqueId
+   * @return
+   * @throws IOException
+   */
+  public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath,
+      String uniqueId) throws IOException {
+    CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
+        new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
+    if (segmentIndexFIleMergeStatus != null) {
+      uniqueId = System.currentTimeMillis() + "";
+      String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
+      String path =
+          CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
+              + newSegmentFileName;
+      SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path);
+      updateSegmentFile(tablePath, segmentId, newSegmentFileName);
+      deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
+    }
+    return uniqueId;
+  }
+
+  private static void deleteFiles(List<String> filesToBeDeleted) throws IOException {
+    for (String filePath : filesToBeDeleted) {
+      FileFactory.deleteFile(filePath, FileFactory.getFileType(filePath));
+    }
+  }
 }