You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/09 10:52:15 UTC

carbondata git commit: [CARBONDATA-1763] Dropped table if exception thrown while creation

Repository: carbondata
Updated Branches:
  refs/heads/master 957a51fef -> 1be27b085


[CARBONDATA-1763] Dropped table if exception thrown while creation

Preaggregate table is not getting dropped when creation fails because

Exceptions from undo metadata is not handled
If preaggregate table is not registered with main table(main table updation fails) then it is not dropped from metastore.

This closes #1951


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/1be27b08
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/1be27b08
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/1be27b08

Branch: refs/heads/master
Commit: 1be27b085696937505acfbf62079c5ca31ad347c
Parents: 957a51f
Author: kunal642 <ku...@gmail.com>
Authored: Thu Feb 8 11:50:23 2018 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Feb 9 16:22:03 2018 +0530

----------------------------------------------------------------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |  2 +-
 .../datamap/CarbonDropDataMapCommand.scala      | 29 ++++++++++++++++----
 .../CreatePreAggregateTableCommand.scala        |  3 +-
 4 files changed, 28 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 5c43d58..8ed7623 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -546,7 +546,7 @@ object CarbonDataRDDFactory {
           LOGGER.error(ex, "Problem while committing data maps")
           false
       }
-      if (!done && !commitComplete) {
+      if (!done || !commitComplete) {
         CarbonLoaderUtil.updateTableStatusForFailure(carbonLoadModel, uniqueTableStatusId)
         LOGGER.info("********starting clean up**********")
         CarbonLoaderUtil.deleteSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index f2f001e..0fd5437 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -123,7 +123,7 @@ case class CarbonCreateDataMapCommand(
   override def undoMetadata(sparkSession: SparkSession, exception: Exception): Seq[Row] = {
     if (dmClassName.equalsIgnoreCase(PREAGGREGATE.toString) ||
       dmClassName.equalsIgnoreCase(TIMESERIES.toString)) {
-      if (!tableIsExists) {
+      if (!tableIsExists && createPreAggregateTableCommands != null) {
         createPreAggregateTableCommands.undoMetadata(sparkSession, exception)
       } else {
         Seq.empty

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index bc55988..8ef394c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -47,7 +47,8 @@ case class CarbonDropDataMapCommand(
     dataMapName: String,
     ifExistsSet: Boolean,
     databaseNameOp: Option[String],
-    tableName: String)
+    tableName: String,
+    forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
   var commandToRun: CarbonDropTableCommand = _
@@ -74,6 +75,10 @@ case class CarbonDropDataMapCommand(
         case ex: NoSuchTableException =>
           throw ex
       }
+      // If datamap to be dropped in parent table then drop the datamap from metastore and remove
+      // entry from parent table.
+      // If force drop is true then remove the datamap from hivemetastore. No need to remove from
+      // parent as the first condition would have taken care of it.
       if (carbonTable.isDefined && carbonTable.get.getTableInfo.getDataMapSchemaList.size() > 0) {
         val dataMapSchema = carbonTable.get.getTableInfo.getDataMapSchemaList.asScala.zipWithIndex.
           find(_._1.getDataMapName.equalsIgnoreCase(dataMapName))
@@ -85,7 +90,6 @@ case class CarbonDropDataMapCommand(
               ifExistsSet,
               sparkSession)
           OperationListenerBus.getInstance.fireEvent(dropDataMapPreEvent, operationContext)
-
           carbonTable.get.getTableInfo.getDataMapSchemaList.remove(dataMapSchema.get._2)
           val schemaConverter = new ThriftWrapperSchemaConverterImpl
           PreAggregateUtil.updateSchemaInfo(
@@ -111,13 +115,28 @@ case class CarbonDropDataMapCommand(
         } else if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
-      } else if ((carbonTable.isDefined &&
-        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0)) {
+      } else if (forceDrop) {
+        val childCarbonTable: Option[CarbonTable] = try {
+          val childTableName = tableName + "_" + dataMapName
+          Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
+        } catch {
+          case _: Exception =>
+            None
+        }
+        if (childCarbonTable.isDefined) {
+          commandToRun = CarbonDropTableCommand(
+            ifExistsSet = true,
+            Some(childCarbonTable.get.getDatabaseName),
+            childCarbonTable.get.getTableName,
+            dropChildTable = true)
+          commandToRun.processMetadata(sparkSession)
+        }
+      } else if (carbonTable.isDefined &&
+        carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
         if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
       }
-
     } catch {
       case e: NoSuchDataMapException =>
         throw e

http://git-wip-us.apache.org/repos/asf/carbondata/blob/1be27b08/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 54f0390..46d885d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -162,7 +162,8 @@ case class CreatePreAggregateTableCommand(
       dataMapName,
       ifExistsSet = true,
       parentTableIdentifier.database,
-      parentTableIdentifier.table).run(sparkSession)
+      parentTableIdentifier.table,
+      forceDrop = true).run(sparkSession)
     Seq.empty
   }