You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/12/24 13:26:21 UTC

[25/50] [abbrv] carbondata git commit: [CARBONDATA-1862][PARTITION] Support compaction for partition table

[CARBONDATA-1862][PARTITION] Support compaction for partition table

It supports compaction on partition table.
There is a change in compaction during the block identification and grouping. As all blocks which are related same partition always needs to group to same set for compaction.So compactor needs to get the partition information from partition map file during compaction of partition table

This closes #1675


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/5ed39de1
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/5ed39de1
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/5ed39de1

Branch: refs/heads/fgdatamap
Commit: 5ed39de193c0a589c30fbbd01e19e85715f2cac0
Parents: 3ff55a2
Author: ravipesala <ra...@gmail.com>
Authored: Sun Dec 17 11:09:34 2017 +0530
Committer: Jacky Li <ja...@qq.com>
Committed: Thu Dec 21 08:59:15 2017 +0800

----------------------------------------------------------------------
 .../core/metadata/PartitionMapFileStore.java    |   4 +
 .../core/metadata/schema/table/CarbonTable.java |   3 +-
 ...andardPartitionTableCompactionTestCase.scala | 178 +++++++++++++++++++
 .../StandardPartitionTableQueryTestCase.scala   |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  72 ++++++--
 .../spark/rdd/CarbonSparkPartition.scala        |   3 +-
 .../command/carbonTableSchemaCommon.scala       |  13 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  15 +-
 .../spark/rdd/CarbonTableCompactor.scala        |  38 +++-
 .../CarbonAlterTableCompactionCommand.scala     |   9 +-
 .../spark/sql/optimizer/CarbonFilters.scala     |  13 ++
 .../merger/CompactionResultSortProcessor.java   |  29 ++-
 .../merger/RowResultMergerProcessor.java        |  19 +-
 .../carbondata/streaming/StreamHandoffRDD.scala |   3 +-
 14 files changed, 353 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index 8578cfe..f60b69f 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -276,6 +276,10 @@ public class PartitionMapFileStore {
     return partitionMap.get(indexFileName);
   }
 
+  public Map<String, List<String>> getPartitionMap() {
+    return partitionMap;
+  }
+
   public static class PartitionMapper implements Serializable {
 
     private static final long serialVersionUID = 3582245668420401089L;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 7732a50..8c4b08b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -571,7 +571,8 @@ public class CarbonTable implements Serializable {
   }
 
   public boolean isPartitionTable() {
-    return null != tablePartitionMap.get(getTableName());
+    return null != tablePartitionMap.get(getTableName())
+        && tablePartitionMap.get(getTableName()).getPartitionType() != PartitionType.NATIVE_HIVE;
   }
 
   public boolean isHivePartitionTable() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
new file mode 100644
index 0000000..9056fea
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".partitionmap")
+      }
+    })
+    assert(dataFiles.length == partitions)
+  }
+
+  test("data compaction for partition table for one partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql("ALTER TABLE partitionone COMPACT 'MINOR'").collect()
+
+    validateDataFiles("default_partitionone", "0.1", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone where empno=11 order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno=11 order by empno"))
+
+  }
+
+
+  test("data compaction for partition table for three partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionthree (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
+
+    validateDataFiles("default_partitionthree", "0.1", 1)
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"))
+  }
+
+  test("data major compaction for partition table") {
+    sql(
+      """
+        | CREATE TABLE partitionmajor (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("ALTER TABLE partitionmajor COMPACT 'MINOR'").collect()
+    sql(s"""ALTER TABLE partitionmajor DROP PARTITION(workgroupcategory='1')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val rows = sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno").collect()
+    sql("ALTER TABLE partitionmajor COMPACT 'MAJOR'").collect()
+    validateDataFiles("default_partitionmajor", "0.2", 1)
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
+      rows)
+  }
+
+  test("data compaction for static partition table") {
+    sql(
+      """
+        | CREATE TABLE staticpartition (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int,workgroupcategory int, empname String, designation String)
+        | PARTITIONED BY (deptname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    sql(s"""insert into staticpartition PARTITION(deptname='software') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    sql(s"""insert into staticpartition PARTITION(deptname='finance') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    sql(s"""insert into staticpartition PARTITION(deptname='finance') select empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
+    val p1 = sql(s"""select count(*) from staticpartition where deptname='software'""").collect()
+    val p2 = sql(s"""select count(*) from staticpartition where deptname='finance'""").collect()
+    sql("ALTER TABLE staticpartition COMPACT 'MINOR'").collect()
+
+    validateDataFiles("default_staticpartition", "0.1", 1)
+
+    checkAnswer(sql(s"""select count(*) from staticpartition where deptname='software'"""), p1)
+    checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""), p2)
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitiontwo")
+    sql("drop table if exists partitionthree")
+    sql("drop table if exists partitionmajor")
+    sql("drop table if exists staticpartition")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 570951a..8670162 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -182,7 +182,7 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
            "doj=2014-08-15 00:00:00",
            "projectenddate=2016-12-30"))
     checkAnswer(frame1,
-      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj>'2006-01-17 00:00:00'"))
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where doj>cast('2006-01-17 00:00:00' as Timestamp)"))
 
   }
 
@@ -209,8 +209,6 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partitionmany")
     sql("drop table if exists partitiondate")
     sql("drop table if exists partitiondateinsert")
-    sql("drop table if exists staticpartitionone")
-    sql("drop table if exists singlepasspartitionone")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index fb4634e..59e5d30 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -43,6 +43,7 @@ import org.apache.carbondata.core.mutate.UpdateVO
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
@@ -85,6 +86,11 @@ class CarbonMergerRDD[K, V](
       } else {
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
       }
+      val partitionNames = if (carbonTable.isHivePartitionTable) {
+        carbonSparkPartition.partitionNames.get.asJava
+      } else {
+        null
+      }
       // this property is used to determine whether temp location for carbon is inside
       // container temp dir or is yarn application directory.
       val carbonUseLocalDir = CarbonProperties.getInstance()
@@ -207,11 +213,13 @@ class CarbonMergerRDD[K, V](
         carbonLoadModel.setPartitionId("0")
         var processor: AbstractResultProcessor = null
         if (restructuredBlockExists) {
-          processor = new CompactionResultSortProcessor(carbonLoadModel, carbonTable,
+          processor = new CompactionResultSortProcessor(
+            carbonLoadModel,
+            carbonTable,
             segmentProperties,
             carbonMergerMapping.campactionType,
-            factTableName
-          )
+            factTableName,
+            partitionNames)
         } else {
           processor =
             new RowResultMergerProcessor(
@@ -220,8 +228,8 @@ class CarbonMergerRDD[K, V](
               segmentProperties,
               tempStoreLoc,
               carbonLoadModel,
-              carbonMergerMapping.campactionType
-            )
+              carbonMergerMapping.campactionType,
+              partitionNames)
         }
         mergeStatus = processor.execute(result2)
         mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
@@ -269,6 +277,9 @@ class CarbonMergerRDD[K, V](
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
+    CarbonTableInputFormat.setPartitionsToPrune(
+      job.getConfiguration,
+      carbonMergerMapping.currentPartitions.asJava)
     CarbonTableInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
@@ -308,7 +319,7 @@ class CarbonMergerRDD[K, V](
           .map(_.asInstanceOf[CarbonInputSplit])
           .filter { split => FileFormat.COLUMNAR_V3.equals(split.getFileFormat) }.toList.asJava
 
-      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter(entry => {
+      carbonInputSplits ++:= splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).filter{ entry =>
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
@@ -318,7 +329,7 @@ class CarbonMergerRDD[K, V](
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
             updateDetails, updateStatusManager)))) &&
         FileFormat.COLUMNAR_V3.equals(entry.getFileFormat)
-      })
+      }
     }
 
     // prepare the details required to extract the segment properties using last segment.
@@ -337,25 +348,25 @@ class CarbonMergerRDD[K, V](
     }
 
     val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
-
-    carbonInputSplits.foreach(splits => {
-      val taskNo = splits.taskId
+    val partitionTaskMap = new util.HashMap[util.List[String], String]()
+    carbonInputSplits.foreach { split =>
+      val taskNo = getTaskNo(split, partitionTaskMap)
       var dataFileFooter: DataFileFooter = null
 
       val splitList = taskIdMapping.get(taskNo)
       noOfBlocks += 1
       if (null == splitList) {
         val splitTempList = new util.ArrayList[CarbonInputSplit]()
-        splitTempList.add(splits)
+        splitTempList.add(split)
         taskIdMapping.put(taskNo, splitTempList)
       } else {
-        splitList.add(splits)
+        splitList.add(split)
       }
 
       // Check the cardinality of each columns and set the highest.
       try {
         dataFileFooter = CarbonUtil.readMetadatFile(
-          CarbonInputSplit.getTableBlockInfo(splits))
+          CarbonInputSplit.getTableBlockInfo(split))
       } catch {
         case e: IOException =>
           logError("Exception in preparing the data file footer for compaction " + e.getMessage)
@@ -367,7 +378,6 @@ class CarbonMergerRDD[K, V](
           dataFileFooter.getColumnInTable,
           dataFileFooter.getSegmentInfo.getColumnCardinality)
     }
-    )
     val updatedMaxSegmentColumnList = new util.ArrayList[ColumnSchema]()
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     // update cardinality and column schema list according to master schema
@@ -428,7 +438,12 @@ class CarbonMergerRDD[K, V](
             carbonPartitionId = Integer.parseInt(taskInfo.getTaskId)
           }
           result.add(
-            new CarbonSparkPartition(id, taskPartitionNo, multiBlockSplit, carbonPartitionId))
+            new CarbonSparkPartition(
+              id,
+              taskPartitionNo,
+              multiBlockSplit,
+              carbonPartitionId,
+              getPartitionNamesFromTask(taskInfo.getTaskId, partitionTaskMap)))
           taskPartitionNo += 1
         }
       }
@@ -456,6 +471,33 @@ class CarbonMergerRDD[K, V](
     result.toArray(new Array[Partition](result.size))
   }
 
+  private def getTaskNo(
+      split: CarbonInputSplit,
+      partitionTaskMap: util.Map[List[String], String]): String = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      val partitions =
+        carbonMergerMapping.partitionMapper.getPartitionMap.get(
+          CarbonTablePath.getCarbonIndexFileName(split.getBlockPath))
+      var task = partitionTaskMap.get(partitions)
+      if (task == null) {
+        task = split.taskId
+        partitionTaskMap.put(partitions, task)
+      }
+      task
+    } else {
+      split.taskId
+    }
+  }
+
+  private def getPartitionNamesFromTask(taskId: String,
+      partitionTaskMap: util.Map[List[String], String]): Option[Seq[String]] = {
+    if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
+      Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1.asScala)
+    } else {
+      None
+    }
+  }
+
   override def getPreferredLocations(split: Partition): Seq[String] = {
     val theSplit = split.asInstanceOf[CarbonSparkPartition]
     theSplit.split.value.getLocations.filter(_ != "localhost")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
index cf539ba..036f1d1 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
@@ -25,7 +25,8 @@ class CarbonSparkPartition(
     val rddId: Int,
     val idx: Int,
     @transient val multiBlockSplit: CarbonMultiBlockSplit,
-    val partitionId: Int = 0)
+    val partitionId: Int = 0,
+    val partitionNames: Option[Seq[String]] = None)
     extends Partition {
 
   val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index ad6d876..c7a7b69 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -29,7 +29,8 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
@@ -114,7 +115,9 @@ case class CarbonMergerMapping(
     // maxSegmentColCardinality is Cardinality of last segment of compaction
     var maxSegmentColCardinality: Array[Int],
     // maxSegmentColumnSchemaList is list of column schema of last segment of compaction
-    var maxSegmentColumnSchemaList: List[ColumnSchema])
+    var maxSegmentColumnSchemaList: List[ColumnSchema],
+    currentPartitions: Seq[String],
+    @transient partitionMapper: PartitionMapper)
 
 case class NodeInfo(TaskId: String, noOfBlocks: Int)
 
@@ -134,13 +137,15 @@ case class UpdateTableModel(
 case class CompactionModel(compactionSize: Long,
     compactionType: CompactionType,
     carbonTable: CarbonTable,
-    isDDLTrigger: Boolean)
+    isDDLTrigger: Boolean,
+    currentPartitions: Seq[String])
 
 case class CompactionCallableModel(carbonLoadModel: CarbonLoadModel,
     carbonTable: CarbonTable,
     loadsToMerge: util.List[LoadMetadataDetails],
     sqlContext: SQLContext,
-    compactionType: CompactionType)
+    compactionType: CompactionType,
+    currentPartitions: Seq[String])
 
 case class AlterPartitionModel(carbonLoadModel: CarbonLoadModel,
     segmentId: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 72c979a..8c9700c 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -40,6 +40,7 @@ import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, N
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.constants.LoggerAction
@@ -209,11 +210,12 @@ object CarbonDataRDDFactory {
 
               val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MAJOR)
 
-              val newcompactionModel = CompactionModel(compactionSize,
+              val newcompactionModel = CompactionModel(
+                compactionSize,
                 compactionType,
                 table,
-                compactionModel.isDDLTrigger
-              )
+                compactionModel.isDDLTrigger,
+                CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, table))
               // proceed for compaction
               try {
                 CompactionFactory.getCompactor(
@@ -724,11 +726,12 @@ object CarbonDataRDDFactory {
                    s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       val compactionSize = 0
       val isCompactionTriggerByDDl = false
-      val compactionModel = CompactionModel(compactionSize,
+      val compactionModel = CompactionModel(
+        compactionSize,
         CompactionType.MINOR,
         carbonTable,
-        isCompactionTriggerByDDl
-      )
+        isCompactionTriggerByDDl,
+        CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable))
       var storeLocation = ""
       val configuredStore = Util.getConfiguredLocalDirs(SparkEnv.get.conf)
       if (null != configuredStore && configuredStore.nonEmpty) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index 5f5a3d1..9768625 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -18,6 +18,7 @@
 package org.apache.carbondata.spark.rdd
 
 import java.util
+import java.util.{List, Map}
 import java.util.concurrent.ExecutorService
 
 import scala.collection.JavaConverters._
@@ -26,8 +27,11 @@ import scala.concurrent.{Await, ExecutionContext, ExecutionContextExecutor, Futu
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel, CompactionModel}
 
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
+import org.apache.carbondata.core.metadata.PartitionMapFileStore.PartitionMapper
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{AlterTableCompactionPostEvent, AlterTableCompactionPreEvent, AlterTableCompactionPreStatusUpdateEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
@@ -111,7 +115,8 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       compactionModel.carbonTable,
       loadsToMerge,
       sqlContext,
-      compactionModel.compactionType)
+      compactionModel.compactionType,
+      compactionModel.currentPartitions)
     triggerCompaction(compactionCallableModel)
   }
 
@@ -121,6 +126,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val sc = compactionCallableModel.sqlContext
     val carbonLoadModel = compactionCallableModel.carbonLoadModel
     val compactionType = compactionCallableModel.compactionType
+    val partitions = compactionCallableModel.currentPartitions
     val tablePath = carbonLoadModel.getTablePath
     val startTime = System.nanoTime()
     val mergedLoadName = CarbonDataMergerUtil.getMergedLoadName(loadsToMerge)
@@ -130,7 +136,26 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
     val validSegments: Array[String] = CarbonDataMergerUtil
       .getValidSegments(loadsToMerge).split(',')
     val mergeLoadStartTime = CarbonUpdateUtil.readCurrentTime()
-    val carbonMergerMapping = CarbonMergerMapping(tablePath,
+    val partitionMapper = if (carbonTable.isHivePartitionTable) {
+      var partitionMap: util.Map[String, util.List[String]] = null
+      validSegments.foreach { segmentId =>
+        val localMapper = new PartitionMapFileStore()
+        localMapper.readAllPartitionsOfSegment(
+          CarbonTablePath.getSegmentPath(carbonLoadModel.getTablePath, segmentId))
+        if (partitionMap == null) {
+          partitionMap = localMapper.getPartitionMap
+        } else {
+          partitionMap.putAll(localMapper.getPartitionMap)
+        }
+      }
+      val mapper = new PartitionMapper()
+      mapper.setPartitionMap(partitionMap)
+      mapper
+    } else {
+      null
+    }
+    val carbonMergerMapping = CarbonMergerMapping(
+      tablePath,
       carbonTable.getMetaDataFilepath,
       mergedLoadName,
       databaseName,
@@ -139,8 +164,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
       compactionType,
       maxSegmentColCardinality = null,
-      maxSegmentColumnSchemaList = null
-    )
+      maxSegmentColumnSchemaList = null,
+      currentPartitions = partitions,
+      partitionMapper)
     carbonLoadModel.setTablePath(carbonMergerMapping.hdfsStoreLocation)
     carbonLoadModel.setLoadMetadataDetails(
       SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath).toList.asJava)
@@ -196,7 +222,9 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
       CommonUtil.mergeIndexFiles(
         sc.sparkContext, Seq(mergedLoadNumber), tablePath, carbonTable, false)
-
+      new PartitionMapFileStore().mergePartitionMapFiles(
+        CarbonTablePath.getSegmentPath(tablePath, mergedLoadNumber),
+        carbonLoadModel.getFactTimeStamp + "")
       // trigger event for compaction
       val alterTableCompactionPreStatusUpdateEvent: AlterTableCompactionPreStatusUpdateEvent =
       AlterTableCompactionPreStatusUpdateEvent(sc.sparkSession,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 930dd57..cb0ff2d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.{AlterTableModel, CarbonMergerMapping, CompactionModel, DataCommand}
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -159,7 +160,8 @@ case class CarbonAlterTableCompactionCommand(
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
-      isCompactionTriggerByDDl
+      isCompactionTriggerByDDl,
+      CarbonFilters.getCurrentPartitions(sqlContext.sparkSession, carbonTable)
     )
 
     val isConcurrentCompactionAllowed = CarbonProperties.getInstance()
@@ -209,8 +211,9 @@ case class CarbonAlterTableCompactionCommand(
               carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableId,
               compactionType,
               maxSegmentColCardinality = null,
-              maxSegmentColumnSchemaList = null
-            )
+              maxSegmentColumnSchemaList = null,
+              compactionModel.currentPartitions,
+              null)
 
             // trigger event for merge index
             val operationContext = new OperationContext

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index a63b358..24fd732 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -25,6 +25,7 @@ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression, Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression, OrExpression}
@@ -392,6 +393,18 @@ object CarbonFilters {
     }
   }
 
+  def getCurrentPartitions(sparkSession: SparkSession,
+      carbonTable: CarbonTable): Seq[String] = {
+    if (carbonTable.isHivePartitionTable) {
+      CarbonFilters.getPartitions(
+        Seq.empty,
+        sparkSession,
+        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+    } else {
+      Seq.empty
+    }
+  }
+
   def getPartitions(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index d115a7a..eece8f2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -26,6 +26,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -34,6 +35,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
 import org.apache.carbondata.processing.sort.sortdata.SingleThreadFinalSortFilesMerger;
@@ -127,21 +129,19 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private SortIntermediateFileMerger intermediateFileMerger;
 
-  /**
-   * @param carbonLoadModel
-   * @param carbonTable
-   * @param segmentProperties
-   * @param compactionType
-   * @param tableName
-   */
+  private List<String> partitionNames;
+
+
   public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
-      SegmentProperties segmentProperties, CompactionType compactionType, String tableName) {
+      SegmentProperties segmentProperties, CompactionType compactionType, String tableName,
+      List<String> partitionNames) {
     this.carbonLoadModel = carbonLoadModel;
     this.carbonTable = carbonTable;
     this.segmentProperties = segmentProperties;
     this.segmentId = carbonLoadModel.getSegmentId();
     this.compactionType = compactionType;
     this.tableName = tableName;
+    this.partitionNames = partitionNames;
   }
 
   /**
@@ -168,6 +168,19 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
     } catch (Exception e) {
       LOGGER.error(e, "Compaction failed: " + e.getMessage());
     } finally {
+      if (partitionNames != null) {
+        try {
+          new PartitionMapFileStore().writePartitionMapFile(
+              CarbonTablePath.getSegmentPath(
+                  carbonLoadModel.getTablePath(),
+                  carbonLoadModel.getSegmentId()),
+              carbonLoadModel.getTaskNo(),
+              partitionNames);
+        } catch (IOException e) {
+          LOGGER.error(e, "Compaction failed: " + e.getMessage());
+          isCompactionSuccess = false;
+        }
+      }
       // clear temp files and folders created during compaction
       deleteTempStoreLocation();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index fbe2c85..3d0700b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.processing.merger;
 
+import java.io.IOException;
 import java.util.AbstractQueue;
 import java.util.Comparator;
 import java.util.List;
@@ -29,10 +30,12 @@ import org.apache.carbondata.core.datastore.row.CarbonRow;
 import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
 import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
 import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.processing.exception.SliceMergerException;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
 import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -47,6 +50,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
   private CarbonFactHandler dataHandler;
   private SegmentProperties segprop;
+  private CarbonLoadModel loadModel;
+  private List<String> partitionNames;
   /**
    * record holder heap
    */
@@ -57,8 +62,10 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
 
   public RowResultMergerProcessor(String databaseName,
       String tableName, SegmentProperties segProp, String[] tempStoreLocation,
-      CarbonLoadModel loadModel, CompactionType compactionType) {
+      CarbonLoadModel loadModel, CompactionType compactionType, List<String> partitionNames) {
     this.segprop = segProp;
+    this.partitionNames = partitionNames;
+    this.loadModel = loadModel;
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
 
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
@@ -150,8 +157,14 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
         if (isDataPresent) {
           this.dataHandler.closeHandler();
         }
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
+        if (partitionNames != null) {
+          new PartitionMapFileStore().writePartitionMapFile(
+              CarbonTablePath.getSegmentPath(loadModel.getTablePath(), loadModel.getSegmentId()),
+              loadModel.getTaskNo(),
+              partitionNames);
+        }
+      } catch (CarbonDataWriterException | IOException e) {
+        LOGGER.error(e,"Exception in compaction merger");
         mergeStatus = false;
       }
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/5ed39de1/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 37aaea5..765a88b 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -177,7 +177,8 @@ class StreamHandoffRDD[K, V](
       carbonTable,
       segmentProperties,
       CompactionType.STREAMING,
-      carbonTable.getTableName
+      carbonTable.getTableName,
+      null
     )
   }