You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/05 15:02:34 UTC

[16/50] [abbrv] carbondata git commit: [CARBONDATA-2097] Restriction added to partition table on alter command (add, rename on partition table and drop partition on preaggregate table)

[CARBONDATA-2097] Restriction added to partition table on alter command (add,rename on partition table and drop partition on preaggregate table)

Restriction added to partition table on alter command (add and rename on partion table and drop partition on preaggregate table)

This closes #1885


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/033870da
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/033870da
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/033870da

Branch: refs/heads/fgdatamap
Commit: 033870dab137ba99f5273c934741e96884b3247d
Parents: 099a047
Author: kushalsaha <ku...@gmail.com>
Authored: Tue Jan 30 16:38:47 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Wed Jan 31 14:46:54 2018 +0530

----------------------------------------------------------------------
 .../StandardPartitionTableQueryTestCase.scala   | 42 ++++++++++++++++++++
 ...rbonAlterTableDropHivePartitionCommand.scala |  7 +++-
 .../sql/execution/strategy/DDLStrategy.scala    | 21 ++++++++++
 3 files changed, 69 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index d1ef94c..b1fc0a7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -248,6 +248,46 @@ test("Creation of partition table should fail if the colname in table schema and
   }
 }
 
+  test("Renaming a partition table should fail"){
+    sql("drop table if exists partitionTable")
+    sql(
+      """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata'
+      """.stripMargin)
+    sql("insert into partitionTable select 1,'huawei','abc'")
+    checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc")))
+    intercept[Exception]{
+      sql("alter table partitionTable PARTITION (email='abc') rename to PARTITION (email='def)")
+    }
+  }
+
+  test("add partition based on location on partition table should fail"){
+    sql("drop table if exists partitionTable")
+    sql(
+      """create table partitionTable (id int,name String) partitioned by(email string) stored by 'carbondata'
+      """.stripMargin)
+    sql("insert into partitionTable select 1,'huawei','abc'")
+    checkAnswer(sql("show partitions partitionTable"), Seq(Row("email=abc")))
+    intercept[Exception]{
+      sql("alter table partitionTable add partition (email='def') location 'abc/part1'")
+    }
+  }
+
+  test("drop partition on preAggregate table should fail"){
+    sql("drop table if exists partitionTable")
+    sql("drop datamap if exists preaggTable on table partitionTable")
+    sql("create table partitionTable (id int,city string,age int) partitioned by(name string) stored by 'carbondata'".stripMargin)
+    sql(
+      s"""create datamap preaggTable on table partitionTable using 'preaggregate' as select id,sum(age) from partitionTable group by id"""
+        .stripMargin)
+    sql("insert into partitionTable select 1,'Bangalore',30,'John'")
+    sql("insert into partitionTable select 2,'Chennai',20,'Huawei'")
+    checkAnswer(sql("show partitions partitionTable"), Seq(Row("name=John"),Row("name=Huawei")))
+    intercept[Exception]{
+      sql("alter table partitionTable drop PARTITION(name='John')")
+    }
+  }
+
+
   private def verifyPartitionInfo(frame: DataFrame, partitionNames: Seq[String]) = {
     val plan = frame.queryExecution.sparkPlan
     val scanRDD = plan collect {
@@ -277,6 +317,8 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists badrecordsignore")
     sql("drop table if exists badrecordsPartitionintnull")
     sql("drop table if exists badrecordsPartitionintnullalt")
+    sql("drop table if exists partitionTable")
+    sql("drop datamap if exists preaggTable on table partitionTable")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index c3509a3..0158a32 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -21,7 +21,7 @@ import java.util
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, AlterTableDropPartitionCommand, AtomicRunnableCommand}
@@ -34,6 +34,7 @@ import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.PartitionMapFileStore
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.spark.rdd.{CarbonDropPartitionCommitRDD, CarbonDropPartitionRDD}
 
 /**
@@ -61,6 +62,10 @@ case class CarbonAlterTableDropHivePartitionCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val table = CarbonEnv.getCarbonTable(tableName)(sparkSession)
+    if (CarbonUtil.hasAggregationDataMap(table)) {
+      throw new AnalysisException(
+        "Partition can not be dropped as it is mapped to Pre Aggregate table")
+    }
     if (table.isHivePartitionTable) {
       try {
         specs.flatMap(f => sparkSession.sessionState.catalog.listPartitions(tableName, Some(f)))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/033870da/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index db8c6a2..b174b94 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -254,6 +254,27 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         ExecutedCommandExec(
           CarbonAlterTableUnsetCommand(tableName, propKeys, ifExists, isView)) :: Nil
       }
+      case rename@AlterTableRenamePartitionCommand(tableName, oldPartition, newPartition) =>
+        val dbOption = tableName.database.map(_.toLowerCase)
+        val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableIdentifier)(sparkSession)
+        if (isCarbonTable) {
+          throw new UnsupportedOperationException("Renaming partition on table is not supported")
+        } else {
+          ExecutedCommandExec(rename) :: Nil
+        }
+      case addPartition@AlterTableAddPartitionCommand(tableName, partitionSpecsAndLocs, _) =>
+        val dbOption = tableName.database.map(_.toLowerCase)
+        val tableIdentifier = TableIdentifier(tableName.table.toLowerCase(), dbOption)
+        val isCarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .tableExists(tableIdentifier)(sparkSession)
+        if (isCarbonTable && partitionSpecsAndLocs.exists(_._2.isDefined)) {
+          throw new UnsupportedOperationException(
+            "add partition with location is not supported")
+        } else {
+          ExecutedCommandExec(addPartition) :: Nil
+        }
       case RefreshTable(tableIdentifier) =>
         RefreshCarbonTableCommand(tableIdentifier.database,
           tableIdentifier.table).run(sparkSession)