You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/21 08:31:44 UTC

[1/2] incubator-carbondata git commit: fixed table not found exception in rename table after lock acquire failure

Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 674b71e46 -> 3de1e6313


fixed table not found exception in rename table after lock acquire failure


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4f011e21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4f011e21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4f011e21

Branch: refs/heads/master
Commit: 4f011e21be8529a665145cfb9d43d8811811766f
Parents: 674b71e
Author: kunal642 <ku...@knoldus.in>
Authored: Tue Apr 18 16:41:30 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 21 13:57:06 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/HdfsFileLock.java     |  1 +
 .../carbondata/core/locks/LocalFileLock.java    |  1 +
 .../spark/util/GlobalDictionaryUtil.scala       |  4 +-
 .../execution/command/AlterTableCommands.scala  | 40 ++++++-----
 .../org/apache/spark/util/AlterTableUtil.scala  | 74 +++++++++-----------
 .../restructure/AlterTableRevertTestCase.scala  | 12 ++++
 6 files changed, 73 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 985ced1..94e7307 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -94,6 +94,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
       return true;
 
     } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 2802127..4cbfc7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -124,6 +124,7 @@ public class LocalFileLock extends AbstractCarbonLock {
         return false;
       }
     } catch (IOException e) {
+      LOGGER.error(e, e.getMessage());
       return false;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index aeb387a..491926c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -830,9 +830,9 @@ object GlobalDictionaryUtil {
     val dictLock = CarbonLockFactory
       .getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
         columnSchema.getColumnUniqueId + LockUsage.LOCK)
-
-    val isDictionaryLocked = dictLock.lockWithRetries()
+    var isDictionaryLocked = false
     try {
+      isDictionaryLocked = dictLock.lockWithRetries()
       if (isDictionaryLocked) {
         LOGGER.info(s"Successfully able to get the dictionary lock for ${
           columnSchema.getColumnName

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 6097231..280d459 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -57,10 +57,10 @@ private[sql] case class AlterTableAddColumns(
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     try {
+      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       // get the latest carbon table and check for column existence
-      lastUpdatedTime = carbonTable.getTableLastUpdatedTime
       // read the latest schema file
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -105,12 +105,12 @@ private[sql] case class AlterTableAddColumns(
             newCols,
             carbonTable.getCarbonTableIdentifier,
             carbonTable.getStorePath).collect()
+          AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
         }
-        AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
         sys.error(s"Alter table add operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks, LOGGER)
+      AlterTableUtil.releaseLocks(locks)
     }
     Seq.empty
   }
@@ -158,10 +158,10 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
     var lastUpdatedTime = 0L
     val carbonTable = relation.tableMeta.carbonTable
     try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
-          sparkSession)
       lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+            sparkSession)
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
         carbonTable.getCarbonTableIdentifier)
@@ -205,21 +205,25 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
     } catch {
       case e: Exception => LOGGER
         .error("Rename table failed: " + e.getMessage)
-        AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
-          sparkSession)
+        AlterTableUtil
+          .revertRenameTableChanges(oldTableIdentifier,
+            newTableName,
+            carbonTable.getStorePath,
+            carbonTable.getCarbonTableIdentifier.getTableId,
+            lastUpdatedTime)(
+            sparkSession)
         renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks, LOGGER)
+      AlterTableUtil.releaseLocks(locks)
       // case specific to rename table as after table rename old table path will not be found
       AlterTableUtil
         .releaseLocksManually(locks,
           locksToBeAcquired,
           oldDatabaseName,
           newTableName,
-          carbonTable.getStorePath,
-          LOGGER)
+          carbonTable.getStorePath)
     }
     Seq.empty
   }
@@ -261,9 +265,9 @@ private[sql] case class AlterTableDropColumns(
       .lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
       .carbonTable
     try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
       lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       // check each column existence in the table
       val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
       var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -336,7 +340,7 @@ private[sql] case class AlterTableDropColumns(
         sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks, LOGGER)
+      AlterTableUtil.releaseLocks(locks)
     }
     Seq.empty
   }
@@ -360,9 +364,9 @@ private[sql] case class AlterTableDataTypeChange(
       .carbonTable
     var lastUpdatedTime = 0L
     try {
-      locks = AlterTableUtil
-        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
       lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+      locks = AlterTableUtil
+        .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
       val columnName = alterTableDataTypeChangeModel.columnName
       val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -418,7 +422,7 @@ private[sql] case class AlterTableDataTypeChange(
         sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
-      AlterTableUtil.releaseLocks(locks, LOGGER)
+      AlterTableUtil.releaseLocks(locks)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 052edce..349c1d9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -26,32 +26,30 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 
 object AlterTableUtil {
 
-  val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   /**
    * Validates that the table exists and acquires meta lock on it.
    *
    * @param dbName
    * @param tableName
-   * @param LOGGER
    * @param sparkSession
    * @return
    */
   def validateTableAndAcquireLock(dbName: String,
       tableName: String,
-      locksToBeAcquired: List[String],
-      LOGGER: LogService)
+      locksToBeAcquired: List[String])
     (sparkSession: SparkSession): List[ICarbonLock] = {
     val relation =
       CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -67,12 +65,12 @@ object AlterTableUtil {
     val acquiredLocks = ListBuffer[ICarbonLock]()
     try {
       locksToBeAcquired.foreach { lock =>
-        acquiredLocks += getLockObject(table, lock, LOGGER)
+        acquiredLocks += getLockObject(table, lock)
       }
       acquiredLocks.toList
     } catch {
       case e: Exception =>
-        releaseLocks(acquiredLocks.toList, LOGGER)
+        releaseLocks(acquiredLocks.toList)
         throw e
     }
   }
@@ -83,12 +81,10 @@ object AlterTableUtil {
    *
    * @param carbonTable
    * @param lockType
-   * @param LOGGER
    * @return
    */
   private def getLockObject(carbonTable: CarbonTable,
-      lockType: String,
-      LOGGER: LogService): ICarbonLock = {
+      lockType: String): ICarbonLock = {
     val carbonLock = CarbonLockFactory
       .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
         lockType)
@@ -104,9 +100,8 @@ object AlterTableUtil {
    * This method will release the locks acquired for an operation
    *
    * @param locks
-   * @param LOGGER
    */
-  def releaseLocks(locks: List[ICarbonLock], LOGGER: LogService): Unit = {
+  def releaseLocks(locks: List[ICarbonLock]): Unit = {
     locks.foreach { carbonLock =>
       if (carbonLock.unlock()) {
         LOGGER.info("Alter table lock released successfully")
@@ -125,14 +120,12 @@ object AlterTableUtil {
    * @param dbName
    * @param tableName
    * @param storeLocation
-   * @param LOGGER
    */
   def releaseLocksManually(locks: List[ICarbonLock],
       locksAcquired: List[String],
       dbName: String,
       tableName: String,
-      storeLocation: String,
-      LOGGER: LogService): Unit = {
+      storeLocation: String): Unit = {
     val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
                        dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
     locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
@@ -206,33 +199,36 @@ object AlterTableUtil {
    */
   def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
       newTableName: String,
+      storePath: String,
+      tableId: String,
       lastUpdatedTime: Long)
     (sparkSession: SparkSession): Unit = {
     val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
-    val carbonTable: CarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(Some(database), newTableName)(sparkSession).asInstanceOf[CarbonRelation]
-      .tableMeta.carbonTable
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
-      carbonTable.getCarbonTableIdentifier)
+    val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath,
+      newCarbonTableIdentifier)
     val tableMetadataFile = carbonTablePath.getSchemaFilePath
     val fileType = FileFactory.getFileType(tableMetadataFile)
-    val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
-      .carbonMetastore.readSchemaFile(tableMetadataFile)
-    val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
-    val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
-    if (updatedTime > lastUpdatedTime) {
-      LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
-      FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-        .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
-                     oldTableIdentifier.table)
-      val tableIdentifier = new CarbonTableIdentifier(database,
-        oldTableIdentifier.table,
-        carbonTable.getCarbonTableIdentifier.getTableId)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
-        tableInfo,
-        carbonTable.getStorePath)(sparkSession)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .removeTableFromMetadata(database, newTableName)
+    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+      val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+        .carbonMetastore
+        .readSchemaFile(tableMetadataFile)
+      val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
+      val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+      if (updatedTime > lastUpdatedTime) {
+        LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
+        FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+                       oldTableIdentifier.table)
+        val tableIdentifier = new CarbonTableIdentifier(database,
+          oldTableIdentifier.table,
+          tableId)
+        CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
+          tableInfo,
+          storePath)(sparkSession)
+        CarbonEnv.getInstance(sparkSession).carbonMetastore
+          .removeTableFromMetadata(database, newTableName)
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index c9244bc..b5a8071 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -22,6 +22,7 @@ import java.io.File
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.util.AlterTableUtil
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.metadata.CarbonMetadata
@@ -95,6 +96,17 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
     }
   }
 
+  test("test to check if exception during rename table does not throws table not found exception") {
+    val locks = AlterTableUtil
+      .validateTableAndAcquireLock("default", "reverttest", List("meta.lock"))(sqlContext
+        .sparkSession)
+    val exception = intercept[RuntimeException] {
+      sql("alter table reverttest rename to revert")
+    }
+    AlterTableUtil.releaseLocks(locks)
+    assert(exception.getMessage == "Alter table rename table operation failed: Table is locked for updation. Please try after some time")
+  }
+
   override def afterAll() {
     hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
     sql("drop table if exists reverttest")


[2/2] incubator-carbondata git commit: [CARBONDATA-957] Fixed table not found exception in rename table after lock acquire failure.This closes #814

Posted by gv...@apache.org.
[CARBONDATA-957] Fixed table not found exception in rename table after lock acquire failure.This closes #814


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3de1e631
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3de1e631
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3de1e631

Branch: refs/heads/master
Commit: 3de1e631367f97b9b2bbe0524efee1d64d58b6b2
Parents: 674b71e 4f011e2
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Fri Apr 21 14:01:28 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 21 14:01:28 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/locks/HdfsFileLock.java     |  1 +
 .../carbondata/core/locks/LocalFileLock.java    |  1 +
 .../spark/util/GlobalDictionaryUtil.scala       |  4 +-
 .../execution/command/AlterTableCommands.scala  | 40 ++++++-----
 .../org/apache/spark/util/AlterTableUtil.scala  | 74 +++++++++-----------
 .../restructure/AlterTableRevertTestCase.scala  | 12 ++++
 6 files changed, 73 insertions(+), 59 deletions(-)
----------------------------------------------------------------------