You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/02/26 12:19:06 UTC

[6/9] carbondata git commit: [CARBONDATA-2187][PARTITION] Partition restructure for new folder structure and supporting partition location feature

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index 2b0dd09..f238d2b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -17,13 +17,14 @@
 package org.apache.carbondata.spark.testsuite.standardpartition
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
@@ -49,24 +50,19 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
 
   }
 
-  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int, partitionMapFiles: Int): Unit = {
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".carbondata")
-      }
-    })
-    assert(dataFiles.length == partitions)
-    val partitionFile = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".partitionmap")
-      }
-    })
-    assert(partitionFile.length == partitionMapFiles)
+    val partitions = CarbonFilters
+      .getPartitions(Seq.empty,
+        sqlContext.sparkSession,
+        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+    assert(partitions.get.length == partition)
+    val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath)
+    val segLoad = details.find(_.getLoadName.equals(segmentId)).get
+    val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
+    assert(seg.getIndexFiles.size == indexes)
   }
 
   test("clean up partition table for int partition column") {
@@ -89,11 +85,10 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
       sql(s"""select count (*) from originTable where empno=11"""))
 
     sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""")
-    validateDataFiles("default_partitionone", "0", 10, 2)
+    validateDataFiles("default_partitionone", "0", 9, 9)
     sql(s"CLEAN FILES FOR TABLE partitionone").show()
-
+    validateDataFiles("default_partitionone", "0", 9, 9)
     checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11")
-    validateDataFiles("default_partitionone", "0", 9, 1)
     checkAnswer(
       sql(s"""select count (*) from partitionone where empno=11"""),
       Seq(Row(0)))
@@ -113,11 +108,11 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
       sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
       sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""")
-      validateDataFiles("default_partitionmany", "0", 10, 2)
-      validateDataFiles("default_partitionmany", "1", 10, 2)
+      validateDataFiles("default_partitionmany", "0", 8, 8)
+      validateDataFiles("default_partitionmany", "1", 8, 8)
       sql(s"CLEAN FILES FOR TABLE partitionmany").show()
-      validateDataFiles("default_partitionmany", "0", 8, 1)
-      validateDataFiles("default_partitionmany", "1", 8, 1)
+      validateDataFiles("default_partitionmany", "0", 8, 8)
+      validateDataFiles("default_partitionmany", "1", 8, 8)
       checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479")
       checkAnswer(
         sql(s"""select count (*) from partitionmany where deptname='Learning'"""),
@@ -142,7 +137,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
     sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""")
     sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""")
     assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0)
-    validateDataFiles("default_partitionall", "0", 10, 6)
+    validateDataFiles("default_partitionall", "0", 0, 0)
     sql(s"CLEAN FILES FOR TABLE partitionall").show()
     validateDataFiles("default_partitionall", "0", 0, 0)
     checkAnswer(
@@ -150,6 +145,30 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
       Seq(Row(0)))
   }
 
+  test("clean up after deleting segments on table") {
+    sql(
+      """
+        | CREATE TABLE partitionalldeleteseg (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionalldeleteseg OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    assert(sql(s"show segments for table partitionalldeleteseg").count == 4)
+    checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"), Seq(Row(40)))
+    sql(s"delete from table partitionalldeleteseg where segment.id in (1)").show()
+    checkExistence(sql(s"show segments for table partitionalldeleteseg"), true, "Marked for Delete")
+    checkAnswer(sql(s"Select count(*) from partitionalldeleteseg"), Seq(Row(30)))
+    sql(s"CLEAN FILES FOR TABLE partitionalldeleteseg").show()
+    assert(sql(s"show segments for table partitionalldeleteseg").count == 3)
+  }
+
+
   override def afterAll = {
     dropTable
   }
@@ -162,6 +181,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists partitionmany")
     sql("drop table if exists partitionshow")
     sql("drop table if exists staticpartition")
+    sql("drop table if exists partitionalldeleteseg")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 22ebd80..33e761f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -20,11 +20,7 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.{CarbonMetadata, PartitionMapFileStore}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndAfterAll {
 
@@ -49,23 +45,6 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
 
   }
 
-  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return CarbonTablePath.isCarbonDataFile(file.getName) ||
-               CarbonTablePath.isCarbonIndexFile(file.getName)
-      }
-    })
-    assert(dataFiles.length > 1)
-    val pstore = new PartitionMapFileStore()
-    pstore.readAllPartitionsOfSegment(segmentDir)
-    println(pstore.getPartitionMap)
-  }
 
   test("data compaction for partition table for one partition column") {
     sql(
@@ -83,9 +62,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
     sql("ALTER TABLE partitionone COMPACT 'MINOR'").collect()
-
-    validateDataFiles("default_partitionone", "0.1", 1)
-
+    checkExistence(sql("show segments for table partitionone"), true, "0.1")
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone where empno=11 order by empno"),
       sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where empno=11 order by empno"))
 
@@ -107,9 +84,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql("ALTER TABLE partitionthree COMPACT 'MINOR'").collect()
-
-    validateDataFiles("default_partitionthree", "0.1", 1)
-
+    checkExistence(sql("show segments for table partitionthree"), true, "0.1")
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"))
   }
@@ -129,6 +104,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql("ALTER TABLE partitionmajor COMPACT 'MINOR'").collect()
+    checkExistence(sql("show segments for table partitionmajor"), true, "0.1")
     sql(s"""ALTER TABLE partitionmajor DROP PARTITION(workgroupcategory='1')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
@@ -136,7 +112,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmajor OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     val rows = sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno").collect()
     sql("ALTER TABLE partitionmajor COMPACT 'MAJOR'").collect()
-    validateDataFiles("default_partitionmajor", "0.2", 1)
+    checkExistence(sql("show segments for table partitionmajor"), true, "0.2")
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmajor where workgroupcategory=1 and empname='arvind' and designation='SE' order by empno"),
       rows)
   }
@@ -158,9 +134,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA
     val p1 = sql(s"""select count(*) from staticpartition where deptname='software'""").collect()
     val p2 = sql(s"""select count(*) from staticpartition where deptname='finance'""").collect()
     sql("ALTER TABLE staticpartition COMPACT 'MINOR'").collect()
-
-    validateDataFiles("default_staticpartition", "0.1", 1)
-
+    checkExistence(sql("show segments for table staticpartition"), true, "0.1")
     checkAnswer(sql(s"""select count(*) from staticpartition where deptname='software'"""), p1)
     checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""), p2)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index aac823a..6e36623 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -204,6 +204,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
       sql(s"""select count (*) from partitionone1 where empno=11"""),
       sql(s"""select count (*) from originTable where empno=11"""))
     sql(s"""ALTER TABLE partitionone1 DROP PARTITION(empno='11')""")
+    sql(s"CLEAN FILES FOR TABLE partitionone1").show()
     assert(Files.notExists(Paths.get(TestQueryExecutor.warehouse + "/partitionone1/" + "empno=11"), LinkOption.NOFOLLOW_LINKS))
     sql("drop table if exists partitionone1")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 10da906..c8f7be3 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -16,20 +16,24 @@
  */
 package org.apache.carbondata.spark.testsuite.standardpartition
 
+import scala.collection.JavaConverters._
 import java.io.{File, FileWriter, IOException}
 import java.util
 import java.util.concurrent.{Callable, ExecutorService, Executors}
 
 import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.BatchedDataSourceScanExec
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, CarbonSession, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
@@ -66,18 +70,15 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
   }
 
-  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int): Unit = {
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
       carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
-      override def accept(file: CarbonFile): Boolean = {
-        return file.getName.endsWith(".partitionmap")
-      }
-    })
-    assert(dataFiles.length == partitions)
+    val partitions = CarbonFilters
+      .getPartitions(Seq.empty,
+        sqlContext.sparkSession,
+        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+    assert(partitions.get.length == partition)
   }
 
   test("data loading for partition table for one partition column") {
@@ -92,7 +93,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_partitionone", "0", 1)
+    validateDataFiles("default_partitionone", "0", 10)
 
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionone order by empno"),
       sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
@@ -111,7 +112,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitiontwo OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_partitiontwo", "0", 1)
+    validateDataFiles("default_partitiontwo", "0", 10)
 
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitiontwo order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
@@ -130,7 +131,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionthree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_partitionthree", "0", 1)
+    validateDataFiles("default_partitionthree", "0", 10)
 
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionthree order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
@@ -146,14 +147,14 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
         |  utilization int,salary int)
         | PARTITIONED BY (workgroupcategory int, empname String, designation String)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname')
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname')
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmultiplethree OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
 
-    validateDataFiles("default_partitionmultiplethree", "1", 1)
-    validateDataFiles("default_partitionmultiplethree", "2", 1)
+    validateDataFiles("default_partitionmultiplethree", "1", 10)
+    validateDataFiles("default_partitionmultiplethree", "2", 10)
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from partitionmultiplethree order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
   }
@@ -172,7 +173,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
     sql(s"""insert into insertpartitionthree select empno,doj,workgroupcategoryname,deptno,deptname,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation from originTable""")
 
-    validateDataFiles("default_insertpartitionthree", "0", 1)
+    validateDataFiles("default_insertpartitionthree", "0", 10)
 
     checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from insertpartitionthree order by empno"),
       sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
@@ -205,7 +206,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
       """.stripMargin)
     sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE singlepasspartitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"', 'SINGLE_PASS'='true')""")
 
-    validateDataFiles("default_singlepasspartitionone", "0", 1)
+    validateDataFiles("default_singlepasspartitionone", "0", 8)
   }
 
   test("data loading for partition table for one static partition column with load syntax") {
@@ -289,7 +290,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
         |  utilization int,salary int)
         | PARTITIONED BY (workgroupcategory int, empname String, designation String)
         | STORED BY 'org.apache.carbondata.format'
-        | TBLPROPERTIES('DICTIONARY_INCLUDE'='empname,designation,deptname')
+        | TBLPROPERTIES('DICTIONARY_INCLUDE'='deptname')
       """.stripMargin)
 
     val tasks = new util.ArrayList[Callable[String]]()
@@ -334,13 +335,12 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
 
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
     val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0")
-    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
-    val files = carbonFile.listFiles(new CarbonFileFilter {
-      override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)
-    })
-    assert(files.length == 10)
+        carbonTable.getTablePath)
+    val details = SegmentStatusManager.readTableStatusFile(tablePath.getTableStatusFilePath)
+    val store = new SegmentFileStore(carbonTable.getTablePath, details(0).getSegmentFile)
+    store.readIndexFiles()
+    store.getIndexFiles
+    assert(store.getIndexFiles.size() == 10)
   }
 
   test("load static partition table for one static partition column with load syntax issue") {
@@ -433,10 +433,10 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     }
     sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE smallpartitionfiles")
     FileUtils.deleteDirectory(folder)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "smallpartitionfiles")
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val segmentDir = carbonTablePath.getSegmentDir("0", "0")
-    assert(new File(segmentDir).listFiles().length < 50)
+    val specs = CarbonFilters.getPartitions(Seq.empty, sqlContext.sparkSession, TableIdentifier("smallpartitionfiles"))
+    specs.get.foreach{s =>
+      assert(new File(s.getLocation.toString).listFiles().length < 10)
+    }
   }
 
   test("verify partition read with small files") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
index 841185b..c24a277 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableOverwriteTestCase.scala
@@ -61,7 +61,6 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
     sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
     sql(s"""insert into staticpartitiondateinsert select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
     sql(s"""insert overwrite table staticpartitiondateinsert PARTITION(projectenddate='2016-06-29',doj='2010-12-29 00:00:00') select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary from originTable where projectenddate=cast('2016-06-29' as Date)""")
-//    sql(s"""insert overwrite table partitiondateinsert  select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable""")
     checkAnswer(sql("select * from staticpartitiondateinsert where projectenddate=cast('2016-06-29' as Date)"),
       sql("select empno, empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from originTable where projectenddate=cast('2016-06-29' as Date)"))
   }
@@ -119,6 +118,7 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
   }
 
   test("dynamic and static partition table with overwrite ") {
+    sql("drop table if exists insertstaticpartitiondynamic")
     sql(
       """
         | CREATE TABLE insertstaticpartitiondynamic (designation String, doj Timestamp,salary int)
@@ -137,6 +137,80 @@ class StandardPartitionTableOverwriteTestCase extends QueryTest with BeforeAndAf
 
   }
 
+  test("dynamic and static partition table with many partition cols overwrite ") {
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int)
+        | PARTITIONED BY (empno int, empname String, doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect()
+    sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname='ravi', doj) select designation, salary, doj from insertstaticpartitiondynamic""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1 and empname='ravi'"), rows)
+  }
+
+  test("dynamic and static partition table with many partition cols overwrite with diffrent order") {
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int)
+        | PARTITIONED BY (empno int, empname String, doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val rows = sql(s"select count(*) from insertstaticpartitiondynamic").collect()
+    sql("""insert overwrite table insertstaticpartitiondynamic PARTITION(empno='1', empname, doj) select designation, salary,empname, doj from insertstaticpartitiondynamic""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno=1"), rows)
+  }
+
+  test("dynamic and static partition table with many partition cols load overwrite ") {
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int)
+        | PARTITIONED BY (empno1 int, empname1 String, doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1='ravi', doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1 and empname1='ravi'"), Seq(Row(10)))
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1='ravi', doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1 and empname1='ravi'"), Seq(Row(10)))
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic"), Seq(Row(10)))
+
+    intercept[Exception] {
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname1, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    }
+  }
+
+  test("dynamic and static partition table with many partition cols load differnt combinations ") {
+    sql("drop table if exists insertstaticpartitiondynamic")
+    sql(
+      """
+        | CREATE TABLE insertstaticpartitiondynamic (designation String,salary int)
+        | PARTITIONED BY (empno1 int, empname String, doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1"), Seq(Row(10)))
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' OVERWRITE INTO TABLE insertstaticpartitiondynamic PARTITION(empno1='1', empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic where empno1=1"), Seq(Row(10)))
+    checkAnswer(sql(s"select count(*) from insertstaticpartitiondynamic"), Seq(Row(10)))
+
+    intercept[Exception] {
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE insertstaticpartitiondynamic PARTITION(empno1, empname, doj) OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    }
+  }
+
   test("overwriting all partition on table and do compaction") {
     sql(
       """

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 95345de..918bbff 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -22,6 +22,7 @@ import org.apache.spark.sql.{DataFrame, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
@@ -260,16 +261,81 @@ test("Creation of partition table should fail if the colname in table schema and
     }
   }
 
-  test("add partition based on location on partition table should fail"){
+
+  test("add partition based on location on partition table"){
     sql("drop table if exists partitionTable")
     sql(
       """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata'
       """.stripMargin)
     sql("insert into partitionTable select 1,'huawei','abc'")
+    val location = metastoredb +"/" +"def"
     checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc")))
-    intercept[Exception]{
-      sql("alter table partitionTable add partition (email='def') location 'abc/part1'")
-    }
+    sql(s"""alter table partitionTable add partition (email='def') location '$location'""")
+    sql("insert into partitionTable select 1,'huawei','def'")
+    checkAnswer(sql("select email from partitionTable"), Seq(Row("def"), Row("abc")))
+    FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(location))
+  }
+
+  test("add partition with static column partition with load command") {
+    sql(
+      """
+        | CREATE TABLE staticpartitionlocload (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi"
+    sql(s"""alter table staticpartitionlocload add partition (empname='ravi') location '$location'""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocload partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionlocload")
+    verifyPartitionInfo(frame, Seq("empname=ravi"))
+    assert(frame.count() == 10)
+    val file = FileFactory.getCarbonFile(location)
+    assert(file.exists())
+    FileFactory.deleteAllCarbonFilesOfDir(file)
+  }
+
+  test("add external partition with static column partition with load command") {
+
+    sql(
+      """
+        | CREATE TABLE staticpartitionlocloadother (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    val location = metastoredb +"/" +"ravi"
+    sql(s"""alter table staticpartitionlocloadother add partition (empname='ravi') location '$location'""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother partition(empname='ravi') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE staticpartitionlocloadother partition(empname='indra') OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(
+      """
+        | CREATE TABLE staticpartitionextlocload (empno int, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp,attendance int,
+        |  deptname String,projectcode int,
+        |  utilization int,salary int,projectenddate Date,doj Timestamp)
+        | PARTITIONED BY (empname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""alter table staticpartitionextlocload add partition (empname='ravi') location '$location'""")
+    val frame = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
+    verifyPartitionInfo(frame, Seq("empname=ravi"))
+    assert(frame.count() == 10)
+    val location2 = storeLocation +"/staticpartitionlocloadother/empname=indra"
+    sql(s"""alter table staticpartitionextlocload add partition (empname='indra') location '$location2'""")
+    val frame1 = sql("select empno,empname,designation,workgroupcategory,workgroupcategoryname,deptno,projectjoindate,attendance,deptname,projectcode,utilization,salary,projectenddate,doj from staticpartitionextlocload")
+    verifyPartitionInfo(frame1, Seq("empname=indra"))
+    assert(frame1.count() == 20)
+    val file = FileFactory.getCarbonFile(location)
+    assert(file.exists())
+    FileFactory.deleteAllCarbonFilesOfDir(file)
   }
 
   test("drop partition on preAggregate table should fail"){
@@ -295,7 +361,7 @@ test("Creation of partition table should fail if the colname in table schema and
         .asInstanceOf[CarbonScanRDD]
     }
     assert(scanRDD.nonEmpty)
-    assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.equals(f))).exists(!_))
+    assert(!partitionNames.map(f => scanRDD.head.partitionNames.exists(_.getPartitions.contains(f))).exists(!_))
   }
 
   override def afterAll = {
@@ -318,6 +384,9 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists badrecordsPartitionintnull")
     sql("drop table if exists badrecordsPartitionintnullalt")
     sql("drop table if exists partitionTable")
+    sql("drop table if exists staticpartitionlocload")
+    sql("drop table if exists staticpartitionextlocload")
+    sql("drop table if exists staticpartitionlocloadother")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
index be459ac..3670e11 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/SparkDataTypeConverterImpl.java
@@ -31,19 +31,31 @@ public final class SparkDataTypeConverterImpl implements DataTypeConverter, Seri
   private static final long serialVersionUID = -4379212832935070583L;
 
   public Object convertToDecimal(Object data) {
+    if (null == data) {
+      return null;
+    }
     java.math.BigDecimal javaDecVal = new java.math.BigDecimal(data.toString());
     return org.apache.spark.sql.types.Decimal.apply(javaDecVal);
   }
 
   public byte[] convertFromStringToByte(Object data) {
+    if (null == data) {
+      return null;
+    }
     return UTF8String.fromString((String) data).getBytes();
   }
 
   public Object convertFromByteToUTF8String(Object data) {
+    if (null == data) {
+      return null;
+    }
     return UTF8String.fromBytes((byte[]) data);
   }
 
   public Object convertFromStringToUTF8String(Object data) {
+    if (null == data) {
+      return null;
+    }
     return UTF8String.fromString((String) data);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index c02ba0a..5fc7e3d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -29,8 +29,9 @@ import org.apache.spark.unsafe.types.UTF8String
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMapFileStore}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -116,7 +117,7 @@ object CarbonStore {
       tablePath: String,
       carbonTable: CarbonTable,
       forceTableClean: Boolean,
-      currentTablePartitions: Option[Seq[String]] = None): Unit = {
+      currentTablePartitions: Option[Seq[PartitionSpec]] = None): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     var carbonCleanFilesLock: ICarbonLock = null
     val absoluteTableIdentifier = if (forceTableClean) {
@@ -139,11 +140,14 @@ object CarbonStore {
           CarbonLockUtil
             .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg)
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-          isForceDeletion = true, carbonTable)
+          isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
         currentTablePartitions match {
           case Some(partitions) =>
-            new PartitionMapFileStore().cleanSegments(carbonTable, partitions.asJava, true)
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              true)
           case _ =>
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
index 9ea58a9..07a2e57 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDropPartitionRDD.scala
@@ -23,10 +23,11 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.{Partition, SparkContext, TaskContext}
 
-import org.apache.carbondata.core.metadata.PartitionMapFileStore
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.datamap.Segment
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.metadata.SegmentFileStore
 
-case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String)
+case class CarbonDropPartition(rddId: Int, val idx: Int, segment: Segment)
   extends Partition {
 
   override val index: Int = idx
@@ -35,115 +36,52 @@ case class CarbonDropPartition(rddId: Int, val idx: Int, segmentPath: String)
 }
 
 /**
- * RDD to drop the partitions from partition mapper files of all segments.
+ * RDD to drop the partitions from segment files of all segments.
  * @param sc
  * @param tablePath
- * @param segments segments to be merged
- * @param partialMatch If it is true then even the partial partition spec matches also can be
- *                      dropped
+ * @param segments segments to be cleaned
  */
 class CarbonDropPartitionRDD(
     sc: SparkContext,
     tablePath: String,
-    segments: Seq[String],
-    partitions: Seq[String],
-    uniqueId: String,
-    partialMatch: Boolean)
-  extends CarbonRDD[String](sc, Nil) {
+    segments: Seq[Segment],
+    partitions: util.List[PartitionSpec],
+    uniqueId: String)
+  extends CarbonRDD[(String, String)](sc, Nil) {
 
   override def getPartitions: Array[Partition] = {
     segments.zipWithIndex.map {s =>
-      CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
+      CarbonDropPartition(id, s._2, s._1)
     }.toArray
   }
 
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
-    val iter = new Iterator[String] {
+  override def internalCompute(
+      theSplit: Partition,
+      context: TaskContext): Iterator[(String, String)] = {
+    val iter = new Iterator[(String, String)] {
       val split = theSplit.asInstanceOf[CarbonDropPartition]
-      logInfo("Dropping partition information from : " + split.segmentPath)
-      partitions.toList.asJava
-      val partitionList = new util.ArrayList[util.List[String]]()
-      partitionList.add(partitions.toList.asJava)
-      new PartitionMapFileStore().dropPartitions(
-        split.segmentPath,
-        partitionList,
+      logInfo("Dropping partition information from : " + split.segment)
+      val toBeDeletedSegments = new util.ArrayList[String]()
+      val toBeUpdateSegments = new util.ArrayList[String]()
+      new SegmentFileStore(
+        tablePath,
+        split.segment.getSegmentFileName).dropPartitions(
+        split.segment,
+        partitions,
         uniqueId,
-        partialMatch)
+        toBeDeletedSegments,
+        toBeUpdateSegments)
 
-      var havePair = false
       var finished = false
 
       override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
         !finished
       }
 
-      override def next(): String = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        ""
+      override def next(): (String, String) = {
+        finished = true
+        (toBeUpdateSegments.asScala.mkString(","), toBeDeletedSegments.asScala.mkString(","))
       }
-
-    }
-    iter
-  }
-
-}
-
-/**
- * This RDD is used for committing the partitions which were removed in before step. It just removes
- * old mapper files and related data files.
- * @param sc
- * @param tablePath
- * @param segments segments to be merged
- */
-class CarbonDropPartitionCommitRDD(
-    sc: SparkContext,
-    tablePath: String,
-    segments: Seq[String],
-    success: Boolean,
-    uniqueId: String,
-    partitions: Seq[String])
-  extends CarbonRDD[String](sc, Nil) {
-
-  override def getPartitions: Array[Partition] = {
-    segments.zipWithIndex.map {s =>
-      CarbonDropPartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
-    }.toArray
-  }
-
-  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String] = {
-    val iter = new Iterator[String] {
-      val split = theSplit.asInstanceOf[CarbonDropPartition]
-      logInfo("Commit partition information from : " + split.segmentPath)
-
-      new PartitionMapFileStore().commitPartitions(split.segmentPath, uniqueId, success, tablePath,
-        partitions.toList.asJava)
-
-      var havePair = false
-      var finished = false
-
-      override def hasNext: Boolean = {
-        if (!finished && !havePair) {
-          finished = true
-          havePair = !finished
-        }
-        !finished
-      }
-
-      override def next(): String = {
-        if (!hasNext) {
-          throw new java.util.NoSuchElementException("End of stream")
-        }
-        havePair = false
-        ""
-      }
-
     }
     iter
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 0859f2e..e0dcffd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -26,6 +26,7 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.spark._
@@ -37,6 +38,7 @@ import org.apache.spark.sql.util.CarbonException
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block._
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
@@ -54,7 +56,7 @@ import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.splits.TableSplit
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.util.{CommonUtil, SparkDataTypeConverterImpl, Util}
+import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, SparkDataTypeConverterImpl, Util}
 
 class CarbonMergerRDD[K, V](
     sc: SparkContext,
@@ -87,8 +89,8 @@ class CarbonMergerRDD[K, V](
       } else {
         carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
       }
-      val partitionNames = if (carbonTable.isHivePartitionTable) {
-        carbonSparkPartition.partitionNames.get.asJava
+      val partitionSpec = if (carbonTable.isHivePartitionTable) {
+        carbonSparkPartition.partitionSpec.get
       } else {
         null
       }
@@ -138,6 +140,14 @@ class CarbonMergerRDD[K, V](
           )
         }
         carbonLoadModel.setSegmentId(mergeNumber)
+
+        if(carbonTable.isHivePartitionTable) {
+          carbonLoadModel.setTaskNo(
+              CarbonScalaUtil.generateUniqueNumber(
+                  theSplit.index,
+                  mergeNumber.replace(".", ""), 0L))
+        }
+
         CommonUtil.setTempStoreLocation(theSplit.index, carbonLoadModel, true, false)
 
         // get destination segment properties as sent from driver which is of last segment.
@@ -198,7 +208,7 @@ class CarbonMergerRDD[K, V](
             segmentProperties,
             carbonMergerMapping.campactionType,
             factTableName,
-            partitionNames)
+            partitionSpec)
         } else {
           LOGGER.info("RowResultMergerProcessor flow is selected")
           processor =
@@ -209,7 +219,7 @@ class CarbonMergerRDD[K, V](
               tempStoreLoc,
               carbonLoadModel,
               carbonMergerMapping.campactionType,
-              partitionNames)
+              partitionSpec)
         }
         mergeStatus = processor.execute(result2)
         mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
@@ -260,7 +270,7 @@ class CarbonMergerRDD[K, V](
     val format = CarbonInputFormatUtil.createCarbonInputFormat(absoluteTableIdentifier, job)
     CarbonTableInputFormat.setPartitionsToPrune(
       job.getConfiguration,
-      carbonMergerMapping.currentPartitions.asJava)
+      carbonMergerMapping.currentPartitions.map(_.asJava).orNull)
     CarbonTableInputFormat.setTableInfo(job.getConfiguration,
       carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getTableInfo)
     var updateDetails: UpdateVO = null
@@ -284,10 +294,10 @@ class CarbonMergerRDD[K, V](
     for (eachSeg <- carbonMergerMapping.validSegments) {
 
       // map for keeping the relation of a task and its blocks.
-      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg)
+      job.getConfiguration.set(CarbonTableInputFormat.INPUT_SEGMENT_NUMBERS, eachSeg.getSegmentNo)
 
       if (updateStatusManager.getUpdateStatusDetails.length != 0) {
-         updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg)
+         updateDetails = updateStatusManager.getInvalidTimestampRange(eachSeg.getSegmentNo)
       }
 
       val updated: Boolean = updateStatusManager.getUpdateStatusDetails.length != 0
@@ -304,7 +314,7 @@ class CarbonMergerRDD[K, V](
         val blockInfo = new TableBlockInfo(entry.getPath.toString,
           entry.getStart, entry.getSegmentId,
           entry.getLocations, entry.getLength, entry.getVersion,
-          updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString)
+          updateStatusManager.getDeleteDeltaFilePath(entry.getPath.toString, entry.getSegmentId)
         )
         (!updated || (updated && (!CarbonUtil
           .isInvalidTableBlock(blockInfo.getSegmentId, blockInfo.getFilePath,
@@ -329,7 +339,7 @@ class CarbonMergerRDD[K, V](
     }
 
     val columnToCardinalityMap = new util.HashMap[java.lang.String, Integer]()
-    val partitionTaskMap = new util.HashMap[util.List[String], String]()
+    val partitionTaskMap = new util.HashMap[PartitionSpec, String]()
     val counter = new AtomicInteger()
     carbonInputSplits.foreach { split =>
       val taskNo = getTaskNo(split, partitionTaskMap, counter)
@@ -455,16 +465,20 @@ class CarbonMergerRDD[K, V](
 
   private def getTaskNo(
       split: CarbonInputSplit,
-      partitionTaskMap: util.Map[List[String], String],
+      partitionTaskMap: util.Map[PartitionSpec, String],
       counter: AtomicInteger): String = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
-      val partitions =
-        carbonMergerMapping.partitionMapper.getPartitionMap.get(
-          CarbonTablePath.getCarbonIndexFileName(split.getBlockPath))
-      var task = partitionTaskMap.get(partitions)
+      val path = split.getPath.getParent
+      val partTask =
+        carbonMergerMapping.currentPartitions.get.find(p => p.getLocation.equals(path)) match {
+        case Some(part) => part
+        case None =>
+          throw new UnsupportedOperationException("Cannot do compaction on dropped partition")
+      }
+      var task = partitionTaskMap.get(partTask)
       if (task == null) {
         task = counter.incrementAndGet().toString
-        partitionTaskMap.put(partitions, task)
+        partitionTaskMap.put(partTask, task)
       }
       task
     } else {
@@ -472,10 +486,12 @@ class CarbonMergerRDD[K, V](
     }
   }
 
+
+
   private def getPartitionNamesFromTask(taskId: String,
-      partitionTaskMap: util.Map[List[String], String]): Option[Seq[String]] = {
+      partitionTaskMap: util.Map[PartitionSpec, String]): Option[PartitionSpec] = {
     if (carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isHivePartitionTable) {
-      Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1.asScala)
+      Some(partitionTaskMap.asScala.find(f => f._2.equals(taskId)).get._1)
     } else {
       None
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 8c29c2a..772f702 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -40,6 +40,7 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.block.Distributable
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.scan.expression.Expression
@@ -68,7 +69,7 @@ class CarbonScanRDD(
     @transient serializedTableInfo: Array[Byte],
     @transient tableInfo: TableInfo,
     inputMetricsStats: InitInputMetrics,
-    @transient val partitionNames: Seq[String])
+    @transient val partitionNames: Seq[PartitionSpec])
   extends CarbonRDDWithTableInfo[InternalRow](spark.sparkContext, Nil, serializedTableInfo) {
 
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
index 036f1d1..b473d35 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonSparkPartition.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd
 
 import org.apache.spark.{Partition, SerializableWritable}
 
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.hadoop.CarbonMultiBlockSplit
 
 class CarbonSparkPartition(
@@ -26,7 +27,7 @@ class CarbonSparkPartition(
     val idx: Int,
     @transient val multiBlockSplit: CarbonMultiBlockSplit,
     val partitionId: Int = 0,
-    val partitionNames: Option[Seq[String]] = None)
+    val partitionSpec: Option[PartitionSpec] = None)
     extends Partition {
 
   val split = new SerializableWritable[CarbonMultiBlockSplit](multiBlockSplit)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
index 2aa5610..c73065d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala
@@ -32,7 +32,7 @@ object PartitionDropper {
 
   def triggerPartitionDrop(dropPartitionCallableModel: DropPartitionCallableModel): Unit = {
     val alterPartitionModel = new AlterPartitionModel(dropPartitionCallableModel.carbonLoadModel,
-      dropPartitionCallableModel.segmentId,
+      dropPartitionCallableModel.segmentId.getSegmentNo,
       dropPartitionCallableModel.oldPartitionIds,
       dropPartitionCallableModel.sqlContext
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index 73be3c8..33263d6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -17,27 +17,35 @@
 
 package org.apache.carbondata.spark.util
 
+import java.{lang, util}
 import java.nio.charset.Charset
 import java.text.SimpleDateFormat
-import java.util
+import java.util.Date
 
+import com.univocity.parsers.common.TextParsingException
+import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.command.DataTypeInfo
+import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel}
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
-import org.apache.carbondata.common.constants.LoggerAction
+import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType}
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory
+import org.apache.carbondata.core.metadata.ColumnIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType => CarbonDataType, DataTypes => CarbonDataTypes, StructField => CarbonStructField}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, ColumnSchema}
-import org.apache.carbondata.core.util.{CarbonSessionInfo, DataTypeUtil}
+import org.apache.carbondata.core.util.DataTypeUtil
+import org.apache.carbondata.processing.exception.DataLoadingException
+import org.apache.carbondata.processing.loading.FailureCauses
+import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
+import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 object CarbonScalaUtil {
   def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = {
@@ -167,33 +175,48 @@ object CarbonScalaUtil {
    * @param dataType Datatype to convert and then convert to String
    * @param timeStampFormat Timestamp format to convert in case of timestamp datatypes
    * @param dateFormat DataFormat to convert in case of DateType datatype
-   * @param serializationNullFormat if this encounters in input data then data will
-   *                                be treated as null
    * @return converted String
    */
-  def convertToString(
+  def convertToDateAndTimeFormats(
       value: String,
       dataType: DataType,
       timeStampFormat: SimpleDateFormat,
-      dateFormat: SimpleDateFormat,
-      serializationNullFormat: String): String = {
-    if (value == null || serializationNullFormat.equals(value)) {
-      return null
-    }
-    dataType match {
-      case TimestampType if timeStampFormat != null =>
-        DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000)
-      case DateType if dateFormat != null =>
-        DateTimeUtils.dateToString(
-          (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt)
-      case ShortType => value.toShort.toString
-      case IntegerType => value.toInt.toString
-      case LongType => value.toLong.toString
-      case DoubleType => value.toDouble.toString
-      case FloatType => value.toFloat.toString
-      case d: DecimalType => new java.math.BigDecimal(value).toPlainString
-      case BooleanType => value.toBoolean.toString
-      case _ => value
+      dateFormat: SimpleDateFormat): String = {
+    val defaultValue = value != null && value.equalsIgnoreCase(hivedefaultpartition)
+    try {
+      dataType match {
+        case TimestampType if timeStampFormat != null =>
+          if (defaultValue) {
+            timeStampFormat.format(new Date())
+          } else {
+            timeStampFormat.format(DateTimeUtils.stringToTime(value))
+          }
+        case DateType if dateFormat != null =>
+          if (defaultValue) {
+            dateFormat.format(new Date())
+          } else {
+            dateFormat.format(DateTimeUtils.stringToTime(value))
+          }
+        case _ =>
+          val convertedValue =
+            DataTypeUtil
+              .getDataBasedOnDataType(value, convertSparkToCarbonDataType(dataType))
+          if (convertedValue == null) {
+            if (defaultValue) {
+              return dataType match {
+                case BooleanType => "false"
+                case _ => "0"
+              }
+            }
+            throw new MalformedCarbonCommandException(
+              s"Value $value with datatype $dataType on static partition is not correct")
+          }
+          value
+      }
+    } catch {
+      case e: Exception =>
+        throw new MalformedCarbonCommandException(
+          s"Value $value with datatype $dataType on static partition is not correct")
     }
   }
 
@@ -214,14 +237,20 @@ object CarbonScalaUtil {
           val time = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
             column.getDataType,
             CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
-          ).getValueFromSurrogate(value.toInt).toString
-          return DateTimeUtils.timestampToString(time.toLong * 1000)
+          ).getValueFromSurrogate(value.toInt)
+          if (time == null) {
+            return null
+          }
+          return DateTimeUtils.timestampToString(time.toString.toLong * 1000)
         } else if (column.getDataType.equals(CarbonDataTypes.DATE)) {
           val date = DirectDictionaryKeyGeneratorFactory.getDirectDictionaryGenerator(
             column.getDataType,
             CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
-          ).getValueFromSurrogate(value.toInt).toString
-          return DateTimeUtils.dateToString(date.toInt)
+          ).getValueFromSurrogate(value.toInt)
+          if (date == null) {
+            return null
+          }
+          return DateTimeUtils.dateToString(date.toString.toInt)
         }
       }
       val dictionaryPath =
@@ -271,14 +300,28 @@ object CarbonScalaUtil {
             CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT
           ).generateDirectSurrogateKey(value).toString
         }
+      } else if (column.hasEncoding(Encoding.DICTIONARY)) {
+        val cacheProvider: CacheProvider = CacheProvider.getInstance
+        val reverseCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
+          cacheProvider.createCache(CacheType.REVERSE_DICTIONARY)
+        val dictionaryPath =
+          table.getTableInfo.getFactTable.getTableProperties.get(
+            CarbonCommonConstants.DICTIONARY_PATH)
+        val dictionaryColumnUniqueIdentifier = new DictionaryColumnUniqueIdentifier(
+          table.getAbsoluteTableIdentifier,
+          new ColumnIdentifier(
+            column.getColumnUniqueId,
+            column.getColumnProperties,
+            column.getDataType),
+          column.getDataType,
+          dictionaryPath)
+        return reverseCache.get(dictionaryColumnUniqueIdentifier).getSurrogateKey(value).toString
       }
       column.getDataType match {
         case CarbonDataTypes.TIMESTAMP =>
-          DataTypeUtil.getDataDataTypeForNoDictionaryColumn(value,
-            column.getDataType,
-            CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT).toString
+          DateTimeUtils.stringToTime(value).getTime.toString
         case CarbonDataTypes.DATE =>
-          DateTimeUtils.stringToDate(UTF8String.fromString(value)).get.toString
+          DateTimeUtils.stringToTime(value).getTime.toString
         case _ => value
       }
     } catch {
@@ -287,7 +330,7 @@ object CarbonScalaUtil {
     }
   }
 
-  private val hiveignorepartition = "__HIVE_IGNORE_PARTITION__"
+  private val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
 
   /**
    * Update partition values as per the right date and time format
@@ -295,10 +338,7 @@ object CarbonScalaUtil {
    */
   def updatePartitions(
       partitionSpec: Map[String, String],
-      table: CarbonTable,
-      timeFormat: SimpleDateFormat,
-      dateFormat: SimpleDateFormat): Map[String, String] = {
-    val hivedefaultpartition = "__HIVE_DEFAULT_PARTITION__"
+  table: CarbonTable): Map[String, String] = {
     val cacheProvider: CacheProvider = CacheProvider.getInstance
     val forwardDictionaryCache: Cache[DictionaryColumnUniqueIdentifier, Dictionary] =
       cacheProvider.createCache(CacheType.FORWARD_DICTIONARY)
@@ -340,37 +380,14 @@ object CarbonScalaUtil {
    * Update partition values as per the right date and time format
    */
   def updatePartitions(
-      carbonSessionInfo: CarbonSessionInfo,
       parts: Seq[CatalogTablePartition],
       table: CarbonTable): Seq[CatalogTablePartition] = {
-    val dateFormatStr = carbonSessionInfo.getThreadParams.getProperty(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_DATEFORMAT_DEFAULT)
-    val dateFormat = new SimpleDateFormat(dateFormatStr)
-    val timeFormatStr = carbonSessionInfo.getThreadParams.getProperty(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_TIMESTAMPFORMAT_DEFAULT)
-    val timeFormat = new SimpleDateFormat(timeFormatStr)
-    val serializeFormat = carbonSessionInfo.getThreadParams.getProperty(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_SERIALIZATION_NULL_FORMAT_DEFAULT)
-    val isEmptyBadRecord = carbonSessionInfo.getThreadParams.getProperty(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD,
-      CarbonLoadOptionConstants.CARBON_OPTIONS_IS_EMPTY_DATA_BAD_RECORD_DEFAULT).toBoolean
-    val badRecordAction = carbonSessionInfo.getThreadParams.getProperty(
-      CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORDS_ACTION,
-      LoggerAction.FAIL.toString)
     parts.map{ f =>
       val changedSpec =
         updatePartitions(
           f.spec,
-          table,
-          timeFormat,
-          dateFormat)
+          table)
       f.copy(spec = changedSpec)
-    }.filterNot{ p =>
-      // Filter the special bad record ignore case string
-      p.spec.exists(_._2.equals(hiveignorepartition))
     }.groupBy(p => p.spec).map(f => f._2.head).toSeq // Avoid duplicates by do groupby
   }
 
@@ -508,4 +525,64 @@ object CarbonScalaUtil {
     }
   }
 
+  /**
+   * Retrieve error message from exception
+   */
+  def retrieveAndLogErrorMsg(ex: Throwable, logger: LogService): (String, String) = {
+    var errorMessage = "DataLoad failure"
+    var executorMessage = ""
+    if (ex != null) {
+      ex match {
+        case sparkException: SparkException =>
+          if (sparkException.getCause.isInstanceOf[DataLoadingException] ||
+              sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
+            executorMessage = sparkException.getCause.getMessage
+            errorMessage = errorMessage + ": " + executorMessage
+          } else if (sparkException.getCause.isInstanceOf[TextParsingException]) {
+            executorMessage = CarbonDataProcessorUtil
+              .trimErrorMessage(sparkException.getCause.getMessage)
+            errorMessage = errorMessage + " : " + executorMessage
+          } else if (sparkException.getCause.isInstanceOf[SparkException]) {
+            val (executorMsgLocal, errorMsgLocal) =
+              retrieveAndLogErrorMsg(sparkException.getCause, logger)
+            executorMessage = executorMsgLocal
+            errorMessage = errorMsgLocal
+          }
+        case aex: AnalysisException =>
+          logger.error(aex.getMessage())
+          throw aex
+        case _ =>
+          if (ex.getCause != null) {
+            executorMessage = ex.getCause.getMessage
+            errorMessage = errorMessage + ": " + executorMessage
+          }
+      }
+    }
+    (executorMessage, errorMessage)
+  }
+
+  /**
+   * Update error inside update model
+   */
+  def updateErrorInUpdateModel(updateModel: UpdateTableModel, executorMessage: String): Unit = {
+    if (updateModel.executorErrors.failureCauses == FailureCauses.NONE) {
+      updateModel.executorErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE
+      if (null != executorMessage && !executorMessage.isEmpty) {
+        updateModel.executorErrors.errorMsg = executorMessage
+      } else {
+        updateModel.executorErrors.errorMsg = "Update failed as the data load has failed."
+      }
+    }
+  }
+
+  /**
+   * Generate unique number to be used as partition number of file name
+   */
+  def generateUniqueNumber(taskId: Int,
+      segmentId: String,
+      partitionNumber: lang.Long): String = {
+    String.valueOf(Math.pow(10, 2).toInt + segmentId.toInt) +
+    String.valueOf(Math.pow(10, 5).toInt + taskId) +
+    String.valueOf(partitionNumber + Math.pow(10, 5).toInt)
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 64e4bb1..94668bd 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -683,7 +683,8 @@ object CommonUtil {
 
   def getCsvHeaderColumns(
       carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): Array[String] = {
+      hadoopConf: Configuration,
+      staticPartitionCols: util.List[String] = new util.ArrayList[String]()): Array[String] = {
     val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
       CarbonCommonConstants.COMMA
     } else {
@@ -691,7 +692,7 @@ object CommonUtil {
     }
     var csvFile: String = null
     var csvHeader: String = carbonLoadModel.getCsvHeader
-    val csvColumns = if (StringUtils.isBlank(csvHeader)) {
+    var csvColumns = if (StringUtils.isBlank(csvHeader)) {
       // read header from csv file
       csvFile = carbonLoadModel.getFactFilePath.split(",")(0)
       csvHeader = CarbonUtil.readHeader(csvFile, hadoopConf)
@@ -704,7 +705,7 @@ object CommonUtil {
     }
 
     if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName, csvColumns,
-        carbonLoadModel.getCarbonDataLoadSchema)) {
+        carbonLoadModel.getCarbonDataLoadSchema, staticPartitionCols)) {
       if (csvFile == null) {
         LOGGER.error("CSV header in DDL is not proper."
                      + " Column names in schema and CSV header are not the same.")
@@ -720,7 +721,23 @@ object CommonUtil {
           + "the same. Input file : " + CarbonUtil.removeAKSK(csvFile))
       }
     }
-    csvColumns
+    // In case of static partition columns just change the name of header if already exists as
+    // we should not take the column from csv file and add them as new columns at the end.
+    if (staticPartitionCols.size() > 0) {
+      val scalaIgnoreColumns = staticPartitionCols.asScala
+      var updatedCols = csvColumns.map{col =>
+        if (scalaIgnoreColumns.exists(_.equalsIgnoreCase(col))) {
+          col + "1"
+        } else {
+          col
+        }
+      }.toList.asJava
+      updatedCols = new util.ArrayList[String](updatedCols)
+      updatedCols.addAll(staticPartitionCols)
+      updatedCols.asScala.toArray
+    } else {
+      csvColumns
+    }
   }
 
   def validateMaxColumns(csvHeaders: Array[String], maxColumns: String): Int = {
@@ -866,7 +883,7 @@ object CommonUtil {
                   val carbonTable = CarbonMetadata.getInstance
                     .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName)
                   DataLoadingUtil.deleteLoadsAndUpdateMetadata(
-                    isForceDeletion = true, carbonTable)
+                    isForceDeletion = true, carbonTable, null)
                 } catch {
                   case _: Exception =>
                     LOGGER.warn(s"Error while cleaning table " +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d3c7740/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index a38eaba..6767ef7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -18,7 +18,8 @@
 package org.apache.carbondata.spark.util
 
 import java.text.SimpleDateFormat
-import java.util.{Date, Locale}
+import java.util
+import java.util.{Date, List, Locale}
 
 import scala.collection.{immutable, mutable}
 import scala.collection.JavaConverters._
@@ -43,9 +44,11 @@ import org.apache.spark.sql.util.SparkSQLUtil.sessionState
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
+import org.apache.carbondata.core.indexstore.PartitionSpec
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat
@@ -221,7 +224,9 @@ object DataLoadingUtil {
       options: immutable.Map[String, String],
       optionsFinal: mutable.Map[String, String],
       carbonLoadModel: CarbonLoadModel,
-      hadoopConf: Configuration): Unit = {
+      hadoopConf: Configuration,
+      partition: Map[String, Option[String]] = Map.empty,
+      isDataFrame: Boolean = false): Unit = {
     carbonLoadModel.setTableName(table.getTableName)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTablePath(table.getTablePath)
@@ -331,8 +336,13 @@ object DataLoadingUtil {
     carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter))
     carbonLoadModel.setCsvHeader(fileHeader)
     carbonLoadModel.setColDictFilePath(column_dict)
+
+    val ignoreColumns = new util.ArrayList[String]()
+    if (!isDataFrame) {
+      ignoreColumns.addAll(partition.filter(_._2.isDefined).keys.toList.asJava)
+    }
     carbonLoadModel.setCsvHeaderColumns(
-      CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf))
+      CommonUtil.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns))
 
     val validatedMaxColumns = CommonUtil.validateMaxColumns(
       carbonLoadModel.getCsvHeaderColumns,
@@ -360,33 +370,40 @@ object DataLoadingUtil {
 
   def deleteLoadsAndUpdateMetadata(
       isForceDeletion: Boolean,
-      carbonTable: CarbonTable): Unit = {
+      carbonTable: CarbonTable,
+      specs: util.List[PartitionSpec]): Unit = {
     if (isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
-      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
       val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-      val carbonTableStatusLock =
-        CarbonLockFactory.getCarbonLockObj(
-          absoluteTableIdentifier,
-          LockUsage.TABLE_STATUS_LOCK
-        )
-
-      // Delete marked loads
-      val isUpdationRequired =
-        DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
-          absoluteTableIdentifier,
-          isForceDeletion,
-          details,
-          carbonTable.getMetaDataFilepath
-        )
 
-      var updationCompletionStaus = false
-
-      if (isUpdationRequired) {
+      val (details, updationRequired) =
+        isUpdationRequired(
+          isForceDeletion,
+          carbonTable,
+          absoluteTableIdentifier)
+
+
+      if (updationRequired) {
+        val carbonTableStatusLock =
+          CarbonLockFactory.getCarbonLockObj(
+            absoluteTableIdentifier,
+            LockUsage.TABLE_STATUS_LOCK
+          )
+        var locked = false
+        var updationCompletionStaus = false
         try {
           // Update load metadate file after cleaning deleted nodes
-          if (carbonTableStatusLock.lockWithRetries()) {
+          locked = carbonTableStatusLock.lockWithRetries()
+          if (locked) {
             LOGGER.info("Table status lock has been successfully acquired.")
-
+            // Again read status and check to verify updation required or not.
+            val (details, updationRequired) =
+              isUpdationRequired(
+                isForceDeletion,
+                carbonTable,
+                absoluteTableIdentifier)
+            if (!updationRequired) {
+              return
+            }
             // read latest table status again.
             val latestMetadata = SegmentStatusManager
               .readLoadMetadata(carbonTable.getMetaDataFilepath)
@@ -409,17 +426,34 @@ object DataLoadingUtil {
           }
           updationCompletionStaus = true
         } finally {
-          CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+          if (locked) {
+            CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK)
+          }
         }
         if (updationCompletionStaus) {
           DeleteLoadFolders
             .physicalFactAndMeasureMetadataDeletion(absoluteTableIdentifier,
-              carbonTable.getMetaDataFilepath, isForceDeletion)
+              carbonTable.getMetaDataFilepath, isForceDeletion, specs)
         }
       }
     }
   }
 
+  private def isUpdationRequired(isForceDeletion: Boolean,
+      carbonTable: CarbonTable,
+      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+    val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+    // Delete marked loads
+    val isUpdationRequired =
+      DeleteLoadFolders.deleteLoadFoldersFromFileSystem(
+        absoluteTableIdentifier,
+        isForceDeletion,
+        details,
+        carbonTable.getMetaDataFilepath
+      )
+    (details, isUpdationRequired)
+  }
+
   /**
    * creates a RDD that does reading of multiple CSV files
    */