You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/02/24 05:27:35 UTC

carbondata git commit: [CARBONDATA-2142] [CARBONDATA-1763] Fixed issues while creation concurrent datamaps

Repository: carbondata
Updated Branches:
  refs/heads/master 310b06de1 -> e9430312d


[CARBONDATA-2142] [CARBONDATA-1763] Fixed issues while creation concurrent datamaps

Analysis:
1. GenerateTableSchemaString in CarbonMetastore did not have any specific implementation for hive metastore due to which carbontables were being
cached in MetaData. As there is no way to refresh table in hivemetastore therefore this is wrong. All queries should get the latest carbon table
from metastore and not from cache.
2. If updating the main table status fails then revertMainTableChanges method is called to revert the changes. The logic to revert was wrong which led
to wrong entry getting deleted from the schema.
3. Moved the force remove logic before taking locks as deletion from metastore should happen even if the lock if not present as the table is in
stale state(Entry is not there in parent but available in metastore).

This closes #1975


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e9430312
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e9430312
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e9430312

Branch: refs/heads/master
Commit: e9430312d61c0770f402e3475178addba3c37e5e
Parents: 310b06d
Author: kunal642 <ku...@gmail.com>
Authored: Tue Feb 13 00:53:31 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Sat Feb 24 10:59:14 2018 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/CarbonLockUtil.java   |  6 ++-
 .../core/metadata/AbsoluteTableIdentifier.java  |  5 +++
 .../datamap/CarbonDropDataMapCommand.scala      | 46 ++++++++++++--------
 .../CreatePreAggregateTableCommand.scala        | 16 ++++---
 .../preaaggregate/PreAggregateUtil.scala        | 27 +-----------
 .../spark/sql/hive/CarbonHiveMetaStore.scala    | 14 +++++-
 6 files changed, 64 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index 1fcccfb..c399ef4 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -74,9 +74,11 @@ public class CarbonLockUtil {
   public static ICarbonLock getLockObject(AbsoluteTableIdentifier absoluteTableIdentifier,
       String lockType, String errorMsg) {
     ICarbonLock carbonLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier, lockType);
-    LOGGER.info("Trying to acquire lock: " + carbonLock);
+    LOGGER.info("Trying to acquire lock: " + lockType + "for table: " +
+        absoluteTableIdentifier.toString());
     if (carbonLock.lockWithRetries()) {
-      LOGGER.info("Successfully acquired the lock " + carbonLock);
+      LOGGER.info("Successfully acquired the lock " + lockType + "for table: " +
+          absoluteTableIdentifier.toString());
     } else {
       LOGGER.error(errorMsg);
       throw new RuntimeException(errorMsg);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 6ef2671..d3250aa 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -143,4 +143,9 @@ public class AbsoluteTableIdentifier implements Serializable {
   public String getTableName() {
     return carbonTableIdentifier.getTableName();
   }
+
+  public String toString() {
+    return carbonTableIdentifier.toString();
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 8ef394c..2675036 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -51,10 +51,10 @@ case class CarbonDropDataMapCommand(
     forceDrop: Boolean = false)
   extends AtomicRunnableCommand {
 
+  val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
   var commandToRun: CarbonDropTableCommand = _
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
@@ -65,6 +65,7 @@ case class CarbonDropDataMapCommand(
     catalog.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
+      forceDropTableFromMetaStore(sparkSession)
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(tableIdentifier, lock)
       }
@@ -115,22 +116,6 @@ case class CarbonDropDataMapCommand(
         } else if (!ifExistsSet) {
           throw new NoSuchDataMapException(dataMapName, tableName)
         }
-      } else if (forceDrop) {
-        val childCarbonTable: Option[CarbonTable] = try {
-          val childTableName = tableName + "_" + dataMapName
-          Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
-        } catch {
-          case _: Exception =>
-            None
-        }
-        if (childCarbonTable.isDefined) {
-          commandToRun = CarbonDropTableCommand(
-            ifExistsSet = true,
-            Some(childCarbonTable.get.getDatabaseName),
-            childCarbonTable.get.getTableName,
-            dropChildTable = true)
-          commandToRun.processMetadata(sparkSession)
-        }
       } else if (carbonTable.isDefined &&
         carbonTable.get.getTableInfo.getDataMapSchemaList.size() == 0) {
         if (!ifExistsSet) {
@@ -156,6 +141,33 @@ case class CarbonDropDataMapCommand(
     Seq.empty
   }
 
+  /**
+   * Used to drop child datamap from hive metastore if it exists.
+   * forceDrop will be true only when an exception occurs in main table status updation for
+   * parent table or in processData from CreatePreAggregateTableCommand.
+   */
+  private def forceDropTableFromMetaStore(sparkSession: SparkSession): Unit = {
+    if (forceDrop) {
+      val childTableName = tableName + "_" + dataMapName
+      LOGGER.info(s"Trying to force drop $childTableName from metastore")
+      val childCarbonTable: Option[CarbonTable] = try {
+        Some(CarbonEnv.getCarbonTable(databaseNameOp, childTableName)(sparkSession))
+      } catch {
+        case _: Exception =>
+          LOGGER.warn(s"Child table $childTableName not found in metastore")
+          None
+      }
+      if (childCarbonTable.isDefined) {
+        commandToRun = CarbonDropTableCommand(
+          ifExistsSet = true,
+          Some(childCarbonTable.get.getDatabaseName),
+          childCarbonTable.get.getTableName,
+          dropChildTable = true)
+        commandToRun.processMetadata(sparkSession)
+      }
+    }
+  }
+
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
     if (commandToRun != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index c861b00..d2acb00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -131,11 +131,17 @@ case class CreatePreAggregateTableCommand(
     dmProperties.foreach(f => childSchema.getProperties.put(f._1, f._2))
 
     // updating the parent table about child table
-    PreAggregateUtil.updateMainTable(
-      CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
-      parentTableIdentifier.table,
-      childSchema,
-      sparkSession)
+    try {
+      PreAggregateUtil.updateMainTable(
+        CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession),
+        parentTableIdentifier.table,
+        childSchema,
+        sparkSession)
+    } catch {
+      case ex: Exception =>
+        undoMetadata(sparkSession, ex)
+        throw ex
+    }
     val updatedLoadQuery = if (timeSeriesFunction.isDefined) {
       PreAggregateUtil.createTimeSeriesSelectQueryFromMain(childSchema.getChildSchema,
         parentTable.getTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
index ae9bc9b..0bee383 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateUtil.scala
@@ -419,7 +419,6 @@ object PreAggregateUtil {
       LockUsage.DROP_TABLE_LOCK)
     var locks = List.empty[ICarbonLock]
     var carbonTable: CarbonTable = null
-    var numberOfCurrentChild: Int = 0
     try {
       val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
       carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
@@ -435,7 +434,6 @@ object PreAggregateUtil {
         dbName,
         tableName,
         carbonTable.getTablePath)
-      numberOfCurrentChild = wrapperTableInfo.getDataMapSchemaList.size
       if (wrapperTableInfo.getDataMapSchemaList.asScala.
         exists(f => f.getDataMapName.equalsIgnoreCase(childSchema.getDataMapName))) {
         throw new Exception("Duplicate datamap")
@@ -445,11 +443,11 @@ object PreAggregateUtil {
         .fromWrapperToExternalTableInfo(wrapperTableInfo, dbName, tableName)
       updateSchemaInfo(carbonTable,
         thriftTable)(sparkSession)
-      LOGGER.info(s"Parent table updated is successful for table $dbName.$tableName")
+      LOGGER.info(s"Parent table updated is successful for table" +
+                  s" $dbName.${childSchema.getRelationIdentifier.toString}")
     } catch {
       case e: Exception =>
         LOGGER.error(e, "Pre Aggregate Parent table update failed reverting changes")
-        revertMainTableChanges(dbName, tableName, numberOfCurrentChild)(sparkSession)
         throw e
     } finally {
       // release lock after command execution completion
@@ -518,27 +516,6 @@ object PreAggregateUtil {
     }
   }
 
-  /**
-   * This method reverts the changes to the schema if add column command fails.
-   *
-   * @param dbName
-   * @param tableName
-   * @param numberOfChildSchema
-   * @param sparkSession
-   */
-  def revertMainTableChanges(dbName: String, tableName: String, numberOfChildSchema: Int)
-    (sparkSession: SparkSession): Unit = {
-    val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-    carbonTable.getTableLastUpdatedTime
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
-    val thriftTable: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
-    if (thriftTable.dataMapSchemas.size > numberOfChildSchema) {
-      metastore.revertTableSchemaForPreAggCreationFailure(
-        carbonTable.getAbsoluteTableIdentifier, thriftTable)(sparkSession)
-    }
-  }
-
   def getChildCarbonTable(databaseName: String, tableName: String)
     (sparkSession: SparkSession): Option[CarbonTable] = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore

http://git-wip-us.apache.org/repos/asf/carbondata/blob/e9430312/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 2aa3c34..16ef38d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
 import org.apache.carbondata.core.datamap.DataMapStoreManager
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.metadata.{schema, AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.CarbonUtil
@@ -183,6 +183,18 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
   }
 
   /**
+   * Generates schema string from TableInfo
+   */
+  override def generateTableSchemaString(
+      tableInfo: schema.table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
+    val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
+    schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    CarbonUtil.convertToMultiGsonStrings(tableInfo, " ", "", ",")
+  }
+
+  /**
    * This method will is used to remove the evolution entry in case of failure.
    *
    * @param carbonTableIdentifier