You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:51 UTC

[carbondata] 31/41: [CARBONDATA-3320]fix number of partitions issue in describe formatted and drop partition issue

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit b0810149d99c5d051efebaa6ba362775f97ed387
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Wed Mar 20 14:59:10 2019 +0530

    [CARBONDATA-3320]fix number of partitions issue in describe formatted and drop partition issue
    
    Problem:
    For hive native partition, number of partitions are always zero in describe formatted
    when drop partition is done, all the empty directories are not deleting
    
    Solution:
    in describe formatted, get the list of partitions from session catalog and get the size.
    get parent of partition to drop and if empty delete the directory during clean files
    
    This closes #3156
---
 .../carbondata/core/metadata/SegmentFileStore.java | 21 +++++++++++++++++----
 .../partition/TestDDLForPartitionTable.scala       | 13 ++++++++++++-
 ...StandardPartitionWithPreaggregateTestCase.scala | 22 ++++++++++++++++++++++
 .../table/CarbonDescribeFormattedCommand.scala     | 19 ++++++++++++++++++-
 4 files changed, 69 insertions(+), 6 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 1e1e303..224b230 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -845,10 +845,7 @@ public class SegmentFileStore {
           }
         }
         CarbonFile path = FileFactory.getCarbonFile(location.getParent().toString());
-        if (path.listFiles().length == 0) {
-          FileFactory.deleteAllCarbonFilesOfDir(
-              FileFactory.getCarbonFile(location.getParent().toString()));
-        }
+        deleteEmptyPartitionFolders(path);
       } else {
         Path location = new Path(entry.getKey()).getParent();
         // delete the segment folder
@@ -861,6 +858,22 @@ public class SegmentFileStore {
     }
   }
 
+  /**
+   * This method deletes the directories recursively if there are no files under corresponding
+   * folder.
+   * Ex: If partition folder is year=2015, month=2,day=5 and drop partition is day=5, it will delete
+   * till year partition folder if there are no other folder or files present under each folder till
+   * year partition
+   */
+  private static void deleteEmptyPartitionFolders(CarbonFile path) {
+    if (path != null && path.listFiles().length == 0) {
+      FileFactory.deleteAllCarbonFilesOfDir(path);
+      Path parentsLocation = new Path(path.getAbsolutePath()).getParent();
+      deleteEmptyPartitionFolders(
+          FileFactory.getCarbonFile(parentsLocation.toString()));
+    }
+  }
+
   private static boolean pathExistsInPartitionSpec(List<PartitionSpec> partitionSpecs,
       Path partitionPath) {
     for (PartitionSpec spec : partitionSpecs) {
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
index d5673bf..7322b95 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDDLForPartitionTable.scala
@@ -380,16 +380,27 @@ class TestDDLForPartitionTable  extends QueryTest with BeforeAndAfterAll {
           |  'RANGE_INFO'='2017-06-11 00:00:02')
         """.stripMargin)
     }
-
     assert(exceptionMessage.getMessage
       .contains("Range info must define a valid range.Please check again!"))
   }
 
+  test("test number of partitions for default partition") {
+    sql("drop table if exists desc")
+    sql("create table desc(name string) partitioned by (num int) stored by 'carbondata'")
+    sql("insert into desc select 'abc',3")
+    sql("insert into desc select 'abc',5")
+    val descFormatted1 = sql("describe formatted desc").collect
+    descFormatted1.find(_.get(0).toString.contains("Number of Partitions")) match {
+      case Some(row) => assert(row.get(1).toString.contains("2"))
+    }
+  }
+
   override def afterAll = {
     dropTable
   }
 
   def dropTable = {
+    sql("drop table if exists desc")
     sql("drop table if exists hashTable")
     sql("drop table if exists rangeTable")
     sql("drop table if exists listTable")
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
index 84c07c4..c3d3456 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionWithPreaggregateTestCase.scala
@@ -26,6 +26,8 @@ import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, Row}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.core.datastore.impl.FileFactory
+
 class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAndAfterAll {
 
   val testData = s"$resourcesPath/sample.csv"
@@ -225,6 +227,26 @@ class StandardPartitionWithPreaggregateTestCase extends QueryTest with BeforeAnd
     checkAnswer(sql("select * from partitionone_p1"), Seq(Row("k",2014,2014,1,2), Row("k",2015,2015,2,3)))
   }
 
+  test("test drop partition directory") {
+    sql("drop table if exists droppartition")
+    sql(
+      """
+        | CREATE TABLE if not exists droppartition (empname String)
+        | PARTITIONED BY (year int, month int,day int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql("insert into droppartition values('k',2014,1,1)")
+    sql("insert into droppartition values('k',2015,2,3)")
+    sql("alter table droppartition drop partition(year=2015,month=2,day=3)")
+    sql("clean files for table droppartition")
+    val table = CarbonEnv.getCarbonTable(Option("partition_preaggregate"), "droppartition")(sqlContext.sparkSession)
+    val tablePath = table.getTablePath
+    val carbonFiles = FileFactory.getCarbonFile(tablePath).listFiles().filter{
+      file => file.getName.equalsIgnoreCase("year=2015")
+    }
+    assert(carbonFiles.length == 0)
+  }
+
   test("test data with filter query") {
     sql("drop table if exists partitionone")
     sql(
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 7468ece..e2a2451 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.carbondata.common.Strings
 import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
 import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -185,7 +186,7 @@ private[sql] case class CarbonDescribeFormattedCommand(
         ("Partition Columns",
           partitionInfo.getColumnSchemaList.asScala.map {
             col => s"${col.getColumnName}:${col.getDataType.getName}"}.mkString(", "), ""),
-        ("Number of Partitions", partitionInfo.getNumPartitions.toString, ""),
+        ("Number of Partitions", getNumberOfPartitions(carbonTable, sparkSession), ""),
         ("Partitions Ids", partitionInfo.getPartitionIds.asScala.mkString(","), "")
       )
       if (partitionInfo.getPartitionType == PartitionType.RANGE) {
@@ -239,6 +240,22 @@ private[sql] case class CarbonDescribeFormattedCommand(
     results.map{case (c1, c2, c3) => Row(c1, c2, c3)}
   }
 
+  /**
+   * This method returns the number of partitions based on the partition type
+   */
+  private def getNumberOfPartitions(carbonTable: CarbonTable,
+      sparkSession: SparkSession): String = {
+    val partitionType = carbonTable.getPartitionInfo.getPartitionType
+    partitionType match {
+      case PartitionType.NATIVE_HIVE =>
+        sparkSession.sessionState.catalog
+          .listPartitions(new TableIdentifier(carbonTable.getTableName,
+            Some(carbonTable.getDatabaseName))).size.toString
+      case _ =>
+        carbonTable.getPartitionInfo.getNumPartitions.toString
+    }
+  }
+
   private def getLocalDictDesc(
       carbonTable: CarbonTable,
       tblProps: Map[String, String]): Seq[(String, String, String)] = {