You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/12/21 16:21:02 UTC

carbondata git commit: [CARBONDATA-1863][PARTITION] Supported clean files for partition table.

Repository: carbondata
Updated Branches:
  refs/heads/master 577a8b0d5 -> 9659edccb


[CARBONDATA-1863][PARTITION] Supported clean files for partition table.

This closes #1706


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/9659edcc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/9659edcc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/9659edcc

Branch: refs/heads/master
Commit: 9659edccbd05e9f5499911e6d049d9b9e1cf8c3a
Parents: 577a8b0
Author: ravipesala <ra...@gmail.com>
Authored: Thu Dec 21 10:23:27 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Thu Dec 21 21:50:42 2017 +0530

----------------------------------------------------------------------
 .../blockletindex/SegmentIndexFileStore.java    |   2 +-
 .../core/metadata/PartitionMapFileStore.java    | 147 ++++++++++++++--
 .../core/writer/CarbonIndexFileMergeWriter.java |  41 +++--
 .../StandardPartitionTableCleanTestCase.scala   | 167 +++++++++++++++++++
 .../StandardPartitionTableDropTestCase.scala    |   2 +-
 .../org/apache/carbondata/api/CarbonStore.scala |  10 +-
 .../carbondata/spark/util/CarbonScalaUtil.scala |  29 ++--
 .../management/CarbonCleanFilesCommand.scala    |  15 +-
 .../management/CarbonLoadDataCommand.scala      |  37 ++--
 .../datasources/CarbonFileFormat.scala          |   9 +-
 .../loading/model/CarbonLoadModel.java          |   6 +-
 .../processing/util/CarbonLoaderUtil.java       |   3 +
 12 files changed, 414 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
index 244e8bb..01cb1d7 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java
@@ -42,7 +42,7 @@ public class SegmentIndexFileStore {
    */
   private Map<String, byte[]> carbonIndexMap;
 
-  public SegmentIndexFileStore() throws IOException {
+  public SegmentIndexFileStore() {
     carbonIndexMap = new HashMap<>();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
index b44f99b..d29dfbb 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
@@ -38,8 +39,18 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.indexstore.blockletindex.SegmentIndexFileStore;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataFileFooterConverter;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 
 import com.google.gson.Gson;
 
@@ -134,6 +145,31 @@ public class PartitionMapFileStore {
     }
   }
 
+  private String getPartitionFilePath(String segmentPath) {
+    CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
+    if (carbonFile.exists()) {
+      CarbonFile[] partitionFiles = carbonFile.listFiles(new CarbonFileFilter() {
+        @Override public boolean accept(CarbonFile file) {
+          return file.getName().endsWith(CarbonTablePath.PARTITION_MAP_EXT);
+        }
+      });
+      if (partitionFiles != null && partitionFiles.length > 0) {
+        partionedSegment = true;
+        int i = 0;
+        // Get the latest partition map file based on the timestamp of that file.
+        long[] partitionTimestamps = new long[partitionFiles.length];
+        for (CarbonFile file : partitionFiles) {
+          partitionTimestamps[i++] = Long.parseLong(file.getName()
+              .substring(0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
+        }
+        Arrays.sort(partitionTimestamps);
+        return segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
+            + CarbonTablePath.PARTITION_MAP_EXT;
+      }
+    }
+    return null;
+  }
+
   private CarbonFile[] getPartitionFiles(String segmentPath) {
     CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath);
     if (carbonFile.exists()) {
@@ -179,22 +215,15 @@ public class PartitionMapFileStore {
     return partitionMapper;
   }
 
+  /**
+   * Reads all partitions which existed inside the passed segment path
+   * @param segmentPath
+   */
   public void readAllPartitionsOfSegment(String segmentPath) {
-    CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
-    if (partitionFiles != null && partitionFiles.length > 0) {
+    String partitionFilePath = getPartitionFilePath(segmentPath);
+    if (partitionFilePath != null) {
       partionedSegment = true;
-      int i = 0;
-      // Get the latest partition map file based on the timestamp of that file.
-      long [] partitionTimestamps = new long[partitionFiles.length];
-      for (CarbonFile file : partitionFiles) {
-        partitionTimestamps[i++] =
-            Long.parseLong(file.getName().substring(
-                0, file.getName().length() - CarbonTablePath.PARTITION_MAP_EXT.length()));
-      }
-      Arrays.sort(partitionTimestamps);
-      PartitionMapper partitionMapper = readPartitionMap(
-          segmentPath + "/" + partitionTimestamps[partitionTimestamps.length - 1]
-              + CarbonTablePath.PARTITION_MAP_EXT);
+      PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
       partitionMap.putAll(partitionMapper.getPartitionMap());
     }
   }
@@ -253,6 +282,96 @@ public class PartitionMapFileStore {
     }
   }
 
+  /**
+   * Clean up invalid data after drop partition in all segments of table
+   * @param table
+   * @param currentPartitions Current partitions of table
+   * @param forceDelete Whether it should be deleted force or check the time for an hour creation
+   *                    to delete data.
+   * @throws IOException
+   */
+  public void cleanSegments(
+      CarbonTable table,
+      List<String> currentPartitions,
+      boolean forceDelete) throws IOException {
+    SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier());
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(table.getAbsoluteTableIdentifier().getTablePath(),
+            table.getAbsoluteTableIdentifier().getCarbonTableIdentifier());
+
+    LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath());
+    // scan through each segment.
+
+    for (LoadMetadataDetails segment : details) {
+
+      // if this segment is valid then only we will go for deletion of related
+      // dropped partition files. if the segment is mark for delete or compacted then any way
+      // it will get deleted.
+
+      if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
+          || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) {
+        List<String> toBeDeletedIndexFiles = new ArrayList<>();
+        List<String> toBeDeletedDataFiles = new ArrayList<>();
+        // take the list of files from this segment.
+        String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName());
+        String partitionFilePath = getPartitionFilePath(segmentPath);
+        if (partitionFilePath != null) {
+          PartitionMapper partitionMapper = readPartitionMap(partitionFilePath);
+          DataFileFooterConverter fileFooterConverter = new DataFileFooterConverter();
+          SegmentIndexFileStore indexFileStore = new SegmentIndexFileStore();
+          indexFileStore.readAllIIndexOfSegment(segmentPath);
+          Set<String> indexFilesFromSegment = indexFileStore.getCarbonIndexMap().keySet();
+          for (String indexFile : indexFilesFromSegment) {
+            // Check the partition information in the partiton mapper
+            List<String> indexPartitions = partitionMapper.partitionMap.get(indexFile);
+            if (indexPartitions == null || !currentPartitions.containsAll(indexPartitions)) {
+              Long fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(indexFile
+                  .substring(indexFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1,
+                      indexFile.length() - CarbonTablePath.INDEX_FILE_EXT.length()));
+              if (CarbonUpdateUtil.isMaxQueryTimeoutExceeded(fileTimestamp) || forceDelete) {
+                toBeDeletedIndexFiles.add(indexFile);
+                // Add the corresponding carbondata files to the delete list.
+                byte[] fileData = indexFileStore.getFileData(indexFile);
+                List<DataFileFooter> indexInfo =
+                    fileFooterConverter.getIndexInfo(segmentPath + "/" + indexFile, fileData);
+                for (DataFileFooter footer : indexInfo) {
+                  toBeDeletedDataFiles.add(footer.getBlockInfo().getTableBlockInfo().getFilePath());
+                }
+              }
+            }
+          }
+
+          if (toBeDeletedIndexFiles.size() > 0) {
+            indexFilesFromSegment.removeAll(toBeDeletedIndexFiles);
+            new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath,
+                new ArrayList<String>(indexFilesFromSegment));
+            for (String dataFile : toBeDeletedDataFiles) {
+              FileFactory.deleteFile(dataFile, FileFactory.getFileType(dataFile));
+            }
+          }
+          CarbonFile[] partitionFiles = getPartitionFiles(segmentPath);
+          CarbonFile currentPartitionFile = FileFactory.getCarbonFile(partitionFilePath);
+          if (partitionFiles != null) {
+            // Delete all old partition files
+            for (CarbonFile partitionFile : partitionFiles) {
+              if (!currentPartitionFile.getName().equalsIgnoreCase(partitionFile.getName())) {
+                partitionFile.delete();
+              }
+            }
+          }
+          partitionMapper = readPartitionMap(partitionFilePath);
+          if (partitionMapper != null) {
+            // delete partition map if there is no partition files exist
+            if (partitionMapper.partitionMap.size() == 0) {
+              currentPartitionFile.delete();
+            }
+          }
+        }
+      }
+    }
+  }
+
   public List<String> getPartitions(String indexFileName) {
     return partitionMap.get(indexFileName);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
index 85f08cc..067d024 100644
--- a/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/writer/CarbonIndexFileMergeWriter.java
@@ -39,36 +39,53 @@ public class CarbonIndexFileMergeWriter {
   /**
    * Merge all the carbonindex files of segment to a  merged file
    * @param segmentPath
+   * @param indexFileNamesTobeAdded while merging it comsiders only these files.
+   *                                If null then consider all
    * @throws IOException
    */
-  public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
+  public void mergeCarbonIndexFilesOfSegment(
+      String segmentPath,
+      List<String> indexFileNamesTobeAdded) throws IOException {
     CarbonFile[] indexFiles = SegmentIndexFileStore.getCarbonIndexFiles(segmentPath);
-    if (isCarbonIndexFilePresent(indexFiles)) {
+    if (isCarbonIndexFilePresent(indexFiles) || indexFileNamesTobeAdded != null) {
       SegmentIndexFileStore fileStore = new SegmentIndexFileStore();
       fileStore.readAllIIndexOfSegment(segmentPath);
-      openThriftWriter(
-          segmentPath + "/" +
-              System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
       Map<String, byte[]> indexMap = fileStore.getCarbonIndexMap();
       MergedBlockIndexHeader indexHeader = new MergedBlockIndexHeader();
       MergedBlockIndex mergedBlockIndex = new MergedBlockIndex();
       List<String> fileNames = new ArrayList<>(indexMap.size());
       List<ByteBuffer> data = new ArrayList<>(indexMap.size());
       for (Map.Entry<String, byte[]> entry : indexMap.entrySet()) {
-        fileNames.add(entry.getKey());
-        data.add(ByteBuffer.wrap(entry.getValue()));
+        if (indexFileNamesTobeAdded == null ||
+            indexFileNamesTobeAdded.contains(entry.getKey())) {
+          fileNames.add(entry.getKey());
+          data.add(ByteBuffer.wrap(entry.getValue()));
+        }
+      }
+      if (fileNames.size() > 0) {
+        openThriftWriter(
+            segmentPath + "/" + System.currentTimeMillis() + CarbonTablePath.MERGE_INDEX_FILE_EXT);
+        indexHeader.setFile_names(fileNames);
+        mergedBlockIndex.setFileData(data);
+        writeMergedBlockIndexHeader(indexHeader);
+        writeMergedBlockIndex(mergedBlockIndex);
+        close();
       }
-      indexHeader.setFile_names(fileNames);
-      mergedBlockIndex.setFileData(data);
-      writeMergedBlockIndexHeader(indexHeader);
-      writeMergedBlockIndex(mergedBlockIndex);
-      close();
       for (CarbonFile indexFile : indexFiles) {
         indexFile.delete();
       }
     }
   }
 
+  /**
+   * Merge all the carbonindex files of segment to a  merged file
+   * @param segmentPath
+   * @throws IOException
+   */
+  public void mergeCarbonIndexFilesOfSegment(String segmentPath) throws IOException {
+    mergeCarbonIndexFilesOfSegment(segmentPath, null);
+  }
+
   private boolean isCarbonIndexFilePresent(CarbonFile[] indexFiles) {
     for (CarbonFile file : indexFiles) {
       if (file.getName().endsWith(CarbonTablePath.INDEX_FILE_EXT)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
new file mode 100644
index 0000000..2b0dd09
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.standardpartition
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+  }
+
+  def validateDataFiles(tableUniqueName: String, segmentId: String, partitions: Int, partitionMapFiles: Int): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".carbondata")
+      }
+    })
+    assert(dataFiles.length == partitions)
+    val partitionFile = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".partitionmap")
+      }
+    })
+    assert(partitionFile.length == partitionMapFiles)
+  }
+
+  test("clean up partition table for int partition column") {
+    sql(
+      """
+        | CREATE TABLE partitionone (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionone OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    checkAnswer(
+      sql(s"""select count (*) from partitionone"""),
+      sql(s"""select count (*) from originTable"""))
+
+    checkAnswer(
+      sql(s"""select count (*) from partitionone where empno=11"""),
+      sql(s"""select count (*) from originTable where empno=11"""))
+
+    sql(s"""ALTER TABLE partitionone DROP PARTITION(empno='11')""")
+    validateDataFiles("default_partitionone", "0", 10, 2)
+    sql(s"CLEAN FILES FOR TABLE partitionone").show()
+
+    checkExistence(sql(s"""SHOW PARTITIONS partitionone"""), false, "empno=11")
+    validateDataFiles("default_partitionone", "0", 9, 1)
+    checkAnswer(
+      sql(s"""select count (*) from partitionone where empno=11"""),
+      Seq(Row(0)))
+
+  }
+
+    test("clean up partition on table for more partition columns") {
+      sql(
+        """
+          | CREATE TABLE partitionmany (empno int, empname String, designation String,
+          |  workgroupcategory int, workgroupcategoryname String, deptno int,
+          |  projectjoindate Timestamp, projectenddate Date,attendance int,
+          |  utilization int,salary int)
+          | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+          | STORED BY 'org.apache.carbondata.format'
+        """.stripMargin)
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+      sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionmany OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+      sql(s"""ALTER TABLE partitionmany DROP PARTITION(deptname='Learning')""")
+      validateDataFiles("default_partitionmany", "0", 10, 2)
+      validateDataFiles("default_partitionmany", "1", 10, 2)
+      sql(s"CLEAN FILES FOR TABLE partitionmany").show()
+      validateDataFiles("default_partitionmany", "0", 8, 1)
+      validateDataFiles("default_partitionmany", "1", 8, 1)
+      checkExistence(sql(s"""SHOW PARTITIONS partitionmany"""), false, "deptname=Learning", "projectcode=928479")
+      checkAnswer(
+        sql(s"""select count (*) from partitionmany where deptname='Learning'"""),
+        Seq(Row(0)))
+    }
+
+  test("clean up after dropping all partition on table") {
+    sql(
+      """
+        | CREATE TABLE partitionall (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int,
+        |  projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (deptname String,doj Timestamp,projectcode int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE partitionall OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='Learning')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='configManagement')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='network')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='protocol')""")
+    sql(s"""ALTER TABLE partitionall DROP PARTITION(deptname='security')""")
+    assert(sql(s"""SHOW PARTITIONS partitionall""").collect().length == 0)
+    validateDataFiles("default_partitionall", "0", 10, 6)
+    sql(s"CLEAN FILES FOR TABLE partitionall").show()
+    validateDataFiles("default_partitionall", "0", 0, 0)
+    checkAnswer(
+      sql(s"""select count (*) from partitionall"""),
+      Seq(Row(0)))
+  }
+
+  override def afterAll = {
+    dropTable
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists partitionone")
+    sql("drop table if exists partitionall")
+    sql("drop table if exists partitionmany")
+    sql("drop table if exists partitionshow")
+    sql("drop table if exists staticpartition")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
index 2a25255..9a9940b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableDropTestCase.scala
@@ -156,7 +156,7 @@ class StandardPartitionTableDropTestCase extends QueryTest with BeforeAndAfterAl
   }
 
   override def afterAll = {
-//    dropTable
+    dropTable
   }
 
   def dropTable = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 2b127e4..d514f77 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -30,7 +30,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, PartitionMapFileStore}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -103,7 +103,8 @@ object CarbonStore {
       tableName: String,
       storePath: String,
       carbonTable: CarbonTable,
-      forceTableClean: Boolean): Unit = {
+      forceTableClean: Boolean,
+      currentTablePartitions: Option[Seq[String]] = None): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     var carbonCleanFilesLock: ICarbonLock = null
     var absoluteTableIdentifier: AbsoluteTableIdentifier = null
@@ -128,6 +129,11 @@ object CarbonStore {
         DataLoadingUtil.deleteLoadsAndUpdateMetadata(
           isForceDeletion = true, carbonTable)
         CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            new PartitionMapFileStore().cleanSegments(carbonTable, partitions.asJava, true)
+          case _ =>
+        }
       }
     } finally {
       if (carbonCleanFilesLock != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
index f78412b..4c405a4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala
@@ -157,16 +157,25 @@ object CarbonScalaUtil {
   def convertToUTF8String(value: String,
       dataType: DataType,
       timeStampFormat: SimpleDateFormat,
-      dateFormat: SimpleDateFormat): UTF8String = {
-    dataType match {
-      case TimestampType =>
-        UTF8String.fromString(
-          DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
-      case DateType =>
-        UTF8String.fromString(
-          DateTimeUtils.dateToString(
-            (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
-      case _ => UTF8String.fromString(value)
+      dateFormat: SimpleDateFormat,
+      serializationNullFormat: String): UTF8String = {
+    if (value == null || serializationNullFormat.equals(value)) {
+      return UTF8String.fromString(value)
+    }
+    try {
+      dataType match {
+        case TimestampType =>
+          UTF8String.fromString(
+            DateTimeUtils.timestampToString(timeStampFormat.parse(value).getTime * 1000))
+        case DateType =>
+          UTF8String.fromString(
+            DateTimeUtils.dateToString(
+              (dateFormat.parse(value).getTime / DateTimeUtils.MILLIS_PER_DAY).toInt))
+        case _ => UTF8String.fromString(value)
+      }
+    } catch {
+      case e: Exception =>
+        UTF8String.fromString(value)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index e0530f6..342acd4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -18,7 +18,10 @@
 package org.apache.spark.sql.execution.command.management
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.Expression
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
+import org.apache.spark.sql.optimizer.CarbonFilters
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.core.util.CarbonProperties
@@ -76,13 +79,21 @@ case class CarbonCleanFilesCommand(
   private def cleanGarbageData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-
+    val partitions: Option[Seq[String]] = if (carbonTable.isHivePartitionTable) {
+      Some(CarbonFilters.getPartitions(
+        Seq.empty[Expression],
+        sparkSession,
+        TableIdentifier(tableName, databaseNameOp)))
+    } else {
+      None
+    }
     CarbonStore.cleanFiles(
       CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       CarbonProperties.getStorePath,
       carbonTable,
-      forceTableClean)
+      forceTableClean,
+      partitions)
   }
 
   private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index f96c0a7..7285d9d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -34,14 +34,15 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
-import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.catalog.CatalogTable
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, LogicalPlan, Project}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, UpdateTableModel}
 import org.apache.spark.sql.execution.datasources.{CarbonFileFormat, CatalogFileIndex, HadoopFsRelation, LogicalRelation}
 import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.spark.sql.types.{StringType, StructField, StructType, TimestampType}
 import org.apache.spark.unsafe.types.UTF8String
 import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils}
@@ -52,6 +53,7 @@ import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.dictionary.server.{DictionaryServer, NonSecureDictionaryServer}
 import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceProvider
+import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
@@ -453,16 +455,28 @@ case class CarbonLoadDataCommand(
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame]) = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
     val logicalPlan =
       sparkSession.sessionState.catalog.lookupRelation(
-        TableIdentifier(table.getTableName, Some(table.getDatabaseName)))
+        identifier)
     val relation = logicalPlan.collect {
       case l: LogicalRelation => l
-      case c: CatalogRelation => c
+      case c // To make compatabile with spark 2.1 and 2.2 we need to compare classes
+        if (c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.CatalogRelation") ||
+            c.getClass.getName.equals("org.apache.spark.sql.catalyst.catalog.HiveTableRelation") ||
+            c.getClass.getName.equals(
+              "org.apache.spark.sql.catalyst.catalog.UnresolvedCatalogRelation")) => c
     }.head
-
+    // Clean up the old invalid segment data.
+    DataLoadingUtil.deleteLoadsAndUpdateMetadata(isForceDeletion = false, table)
+    val currentPartitions =
+      CarbonFilters.getPartitions(Seq.empty[Expression], sparkSession, identifier)
+    // Clean up the alreday dropped partitioned data
+    new PartitionMapFileStore().cleanSegments(table, currentPartitions.asJava, false)
     // Converts the data to carbon understandable format. The timestamp/date format data needs to
     // converted to hive standard fomat to let spark understand the data to partition.
+    val serializationNullFormat =
+      carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
     val query: LogicalPlan = if (dataFrame.isDefined) {
       var timeStampformatString = CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT
       val timeStampFormat = new SimpleDateFormat(timeStampformatString)
@@ -545,13 +559,15 @@ case class CarbonLoadDataCommand(
           jobConf).map{ case (key, value) =>
             val data = new Array[Any](len)
             var i = 0
-            while (i < len) {
+            val input = value.get()
+            while (i < input.length) {
               // TODO find a way to avoid double conversion of date and time.
               data(i) = CarbonScalaUtil.convertToUTF8String(
-                value.get()(i),
+                input(i),
                 rowDataTypes(i),
                 timeStampFormat,
-                dateFormat)
+                dateFormat,
+                serializationNullFormat)
               i = i + 1
             }
             InternalRow.fromSeq(data)
@@ -576,7 +592,7 @@ case class CarbonLoadDataCommand(
           isOverwriteTable,
           carbonLoadModel,
           sparkSession)
-      case c: CatalogRelation =>
+      case others =>
         val catalogTable = CarbonReflectionUtils.getFieldOfCatalogTable(
           "tableMeta",
           relation).asInstanceOf[CatalogTable]
@@ -587,7 +603,7 @@ case class CarbonLoadDataCommand(
         val catalog = new CatalogFileIndex(sparkSession, catalogTable, sizeInBytes)
         convertToLogicalRelation(
           catalogTable,
-          c.output,
+          others.output,
           sizeInBytes,
           isOverwriteTable,
           carbonLoadModel,
@@ -642,6 +658,7 @@ case class CarbonLoadDataCommand(
     options += (("onepass", loadModel.getUseOnePass.toString))
     options += (("dicthost", loadModel.getDictionaryServerHost))
     options += (("dictport", loadModel.getDictionaryServerPort.toString))
+    options ++= this.options
     if (updateModel.isDefined) {
       options += (("updatetimestamp", updateModel.get.updatedTimeStamp.toString))
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
index a95693c..b9df7a1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala
@@ -189,7 +189,14 @@ private class CarbonOutputWriter(path: String,
   extends OutputWriter with AbstractCarbonOutputWriter {
   val partitions = getPartitionsFromPath(path, context).map(ExternalCatalogUtils.unescapePathName)
   val partitionData = if (partitions.nonEmpty) {
-    partitions.map(_.split("=")(1))
+    partitions.map{ p =>
+      val splitData = p.split("=")
+      if (splitData.length > 1) {
+        splitData(1)
+      } else {
+        ""
+      }
+    }
   } else {
     Array.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index 3031b8e..8a295d9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -564,7 +564,11 @@ public class CarbonLoadModel implements Serializable {
    * @return
    */
   public LoadMetadataDetails getCurrentLoadMetadataDetail() {
-    return loadMetadataDetails.get(loadMetadataDetails.size() - 1);
+    if (loadMetadataDetails != null && loadMetadataDetails.size() > 0) {
+      return loadMetadataDetails.get(loadMetadataDetails.size() - 1);
+    } else {
+      return null;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/9659edcc/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index fef6930..49ca254 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -393,6 +393,9 @@ public final class CarbonLoaderUtil {
     SegmentStatus loadStatus = SegmentStatus.MARKED_FOR_DELETE;
     // always the last entry in the load metadata details will be the current load entry
     LoadMetadataDetails loadMetaEntry = model.getCurrentLoadMetadataDetail();
+    if (loadMetaEntry == null) {
+      return;
+    }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp(), true);
     boolean entryAdded =