You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/04/21 08:31:44 UTC
[1/2] incubator-carbondata git commit: fixed table not found
exception in rename table after lock acquire failure
Repository: incubator-carbondata
Updated Branches:
refs/heads/master 674b71e46 -> 3de1e6313
fixed table not found exception in rename table after lock acquire failure
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/4f011e21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/4f011e21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/4f011e21
Branch: refs/heads/master
Commit: 4f011e21be8529a665145cfb9d43d8811811766f
Parents: 674b71e
Author: kunal642 <ku...@knoldus.in>
Authored: Tue Apr 18 16:41:30 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 21 13:57:06 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/locks/HdfsFileLock.java | 1 +
.../carbondata/core/locks/LocalFileLock.java | 1 +
.../spark/util/GlobalDictionaryUtil.scala | 4 +-
.../execution/command/AlterTableCommands.scala | 40 ++++++-----
.../org/apache/spark/util/AlterTableUtil.scala | 74 +++++++++-----------
.../restructure/AlterTableRevertTestCase.scala | 12 ++++
6 files changed, 73 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
index 985ced1..94e7307 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/HdfsFileLock.java
@@ -94,6 +94,7 @@ public class HdfsFileLock extends AbstractCarbonLock {
return true;
} catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
return false;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
index 2802127..4cbfc7a 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/LocalFileLock.java
@@ -124,6 +124,7 @@ public class LocalFileLock extends AbstractCarbonLock {
return false;
}
} catch (IOException e) {
+ LOGGER.error(e, e.getMessage());
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index aeb387a..491926c 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -830,9 +830,9 @@ object GlobalDictionaryUtil {
val dictLock = CarbonLockFactory
.getCarbonLockObj(carbonTablePath.getRelativeDictionaryDirectory,
columnSchema.getColumnUniqueId + LockUsage.LOCK)
-
- val isDictionaryLocked = dictLock.lockWithRetries()
+ var isDictionaryLocked = false
try {
+ isDictionaryLocked = dictLock.lockWithRetries()
if (isDictionaryLocked) {
LOGGER.info(s"Successfully able to get the dictionary lock for ${
columnSchema.getColumnName
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 6097231..280d459 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -57,10 +57,10 @@ private[sql] case class AlterTableAddColumns(
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
try {
+ lastUpdatedTime = carbonTable.getTableLastUpdatedTime
locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
// get the latest carbon table and check for column existence
- lastUpdatedTime = carbonTable.getTableLastUpdatedTime
// read the latest schema file
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
@@ -105,12 +105,12 @@ private[sql] case class AlterTableAddColumns(
newCols,
carbonTable.getCarbonTableIdentifier,
carbonTable.getStorePath).collect()
+ AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
}
- AlterTableUtil.revertAddColumnChanges(dbName, tableName, lastUpdatedTime)(sparkSession)
sys.error(s"Alter table add operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
- AlterTableUtil.releaseLocks(locks, LOGGER)
+ AlterTableUtil.releaseLocks(locks)
}
Seq.empty
}
@@ -158,10 +158,10 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
var lastUpdatedTime = 0L
val carbonTable = relation.tableMeta.carbonTable
try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
- sparkSession)
lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired)(
+ sparkSession)
// get the latest carbon table and check for column existence
val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
carbonTable.getCarbonTableIdentifier)
@@ -205,21 +205,25 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
} catch {
case e: Exception => LOGGER
.error("Rename table failed: " + e.getMessage)
- AlterTableUtil.revertRenameTableChanges(oldTableIdentifier, newTableName, lastUpdatedTime)(
- sparkSession)
+ AlterTableUtil
+ .revertRenameTableChanges(oldTableIdentifier,
+ newTableName,
+ carbonTable.getStorePath,
+ carbonTable.getCarbonTableIdentifier.getTableId,
+ lastUpdatedTime)(
+ sparkSession)
renameBadRecords(newTableName, oldTableName, oldDatabaseName)
sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
- AlterTableUtil.releaseLocks(locks, LOGGER)
+ AlterTableUtil.releaseLocks(locks)
// case specific to rename table as after table rename old table path will not be found
AlterTableUtil
.releaseLocksManually(locks,
locksToBeAcquired,
oldDatabaseName,
newTableName,
- carbonTable.getStorePath,
- LOGGER)
+ carbonTable.getStorePath)
}
Seq.empty
}
@@ -261,9 +265,9 @@ private[sql] case class AlterTableDropColumns(
.lookupRelation(Some(dbName), tableName)(sparkSession).asInstanceOf[CarbonRelation].tableMeta
.carbonTable
try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
// check each column existence in the table
val tableColumns = carbonTable.getCreateOrderColumn(tableName).asScala
var dictionaryColumns = Seq[org.apache.carbondata.core.metadata.schema.table.column
@@ -336,7 +340,7 @@ private[sql] case class AlterTableDropColumns(
sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
- AlterTableUtil.releaseLocks(locks, LOGGER)
+ AlterTableUtil.releaseLocks(locks)
}
Seq.empty
}
@@ -360,9 +364,9 @@ private[sql] case class AlterTableDataTypeChange(
.carbonTable
var lastUpdatedTime = 0L
try {
- locks = AlterTableUtil
- .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
lastUpdatedTime = carbonTable.getTableLastUpdatedTime
+ locks = AlterTableUtil
+ .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired)(sparkSession)
val columnName = alterTableDataTypeChangeModel.columnName
val carbonColumns = carbonTable.getCreateOrderColumn(tableName).asScala.filter(!_.isInvisible)
if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
@@ -418,7 +422,7 @@ private[sql] case class AlterTableDataTypeChange(
sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
} finally {
// release lock after command execution completion
- AlterTableUtil.releaseLocks(locks, LOGGER)
+ AlterTableUtil.releaseLocks(locks)
}
Seq.empty
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 052edce..349c1d9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -26,32 +26,30 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.{CarbonRelation, HiveExternalCatalog}
import org.apache.spark.sql.hive.HiveExternalCatalog._
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
+import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifier}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock}
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
object AlterTableUtil {
- val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
/**
* Validates that the table exists and acquires meta lock on it.
*
* @param dbName
* @param tableName
- * @param LOGGER
* @param sparkSession
* @return
*/
def validateTableAndAcquireLock(dbName: String,
tableName: String,
- locksToBeAcquired: List[String],
- LOGGER: LogService)
+ locksToBeAcquired: List[String])
(sparkSession: SparkSession): List[ICarbonLock] = {
val relation =
CarbonEnv.getInstance(sparkSession).carbonMetastore
@@ -67,12 +65,12 @@ object AlterTableUtil {
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
- acquiredLocks += getLockObject(table, lock, LOGGER)
+ acquiredLocks += getLockObject(table, lock)
}
acquiredLocks.toList
} catch {
case e: Exception =>
- releaseLocks(acquiredLocks.toList, LOGGER)
+ releaseLocks(acquiredLocks.toList)
throw e
}
}
@@ -83,12 +81,10 @@ object AlterTableUtil {
*
* @param carbonTable
* @param lockType
- * @param LOGGER
* @return
*/
private def getLockObject(carbonTable: CarbonTable,
- lockType: String,
- LOGGER: LogService): ICarbonLock = {
+ lockType: String): ICarbonLock = {
val carbonLock = CarbonLockFactory
.getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
lockType)
@@ -104,9 +100,8 @@ object AlterTableUtil {
* This method will release the locks acquired for an operation
*
* @param locks
- * @param LOGGER
*/
- def releaseLocks(locks: List[ICarbonLock], LOGGER: LogService): Unit = {
+ def releaseLocks(locks: List[ICarbonLock]): Unit = {
locks.foreach { carbonLock =>
if (carbonLock.unlock()) {
LOGGER.info("Alter table lock released successfully")
@@ -125,14 +120,12 @@ object AlterTableUtil {
* @param dbName
* @param tableName
* @param storeLocation
- * @param LOGGER
*/
def releaseLocksManually(locks: List[ICarbonLock],
locksAcquired: List[String],
dbName: String,
tableName: String,
- storeLocation: String,
- LOGGER: LogService): Unit = {
+ storeLocation: String): Unit = {
val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
@@ -206,33 +199,36 @@ object AlterTableUtil {
*/
def revertRenameTableChanges(oldTableIdentifier: TableIdentifier,
newTableName: String,
+ storePath: String,
+ tableId: String,
lastUpdatedTime: Long)
(sparkSession: SparkSession): Unit = {
val database = oldTableIdentifier.database.getOrElse(sparkSession.catalog.currentDatabase)
- val carbonTable: CarbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
- .lookupRelation(Some(database), newTableName)(sparkSession).asInstanceOf[CarbonRelation]
- .tableMeta.carbonTable
- val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
- carbonTable.getCarbonTableIdentifier)
+ val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
+ val carbonTablePath = CarbonStorePath.getCarbonTablePath(storePath,
+ newCarbonTableIdentifier)
val tableMetadataFile = carbonTablePath.getSchemaFilePath
val fileType = FileFactory.getFileType(tableMetadataFile)
- val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
- .carbonMetastore.readSchemaFile(tableMetadataFile)
- val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
- val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
- if (updatedTime > lastUpdatedTime) {
- LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
- FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
- .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
- oldTableIdentifier.table)
- val tableIdentifier = new CarbonTableIdentifier(database,
- oldTableIdentifier.table,
- carbonTable.getCarbonTableIdentifier.getTableId)
- CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
- tableInfo,
- carbonTable.getStorePath)(sparkSession)
- CarbonEnv.getInstance(sparkSession).carbonMetastore
- .removeTableFromMetadata(database, newTableName)
+ if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+ val tableInfo: org.apache.carbondata.format.TableInfo = CarbonEnv.getInstance(sparkSession)
+ .carbonMetastore
+ .readSchemaFile(tableMetadataFile)
+ val evolutionEntryList = tableInfo.fact_table.schema_evolution.schema_evolution_history
+ val updatedTime = evolutionEntryList.get(evolutionEntryList.size() - 1).time_stamp
+ if (updatedTime > lastUpdatedTime) {
+ LOGGER.error(s"Reverting changes for $database.${ oldTableIdentifier.table }")
+ FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
+ .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+ oldTableIdentifier.table)
+ val tableIdentifier = new CarbonTableIdentifier(database,
+ oldTableIdentifier.table,
+ tableId)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore.revertTableSchema(tableIdentifier,
+ tableInfo,
+ storePath)(sparkSession)
+ CarbonEnv.getInstance(sparkSession).carbonMetastore
+ .removeTableFromMetadata(database, newTableName)
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/4f011e21/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index c9244bc..b5a8071 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -22,6 +22,7 @@ import java.io.File
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.common.util.QueryTest
import org.apache.spark.sql.test.TestQueryExecutor
+import org.apache.spark.util.AlterTableUtil
import org.scalatest.BeforeAndAfterAll
import org.apache.carbondata.core.metadata.CarbonMetadata
@@ -95,6 +96,17 @@ class AlterTableRevertTestCase extends QueryTest with BeforeAndAfterAll {
}
}
+ test("test to check if exception during rename table does not throws table not found exception") {
+ val locks = AlterTableUtil
+ .validateTableAndAcquireLock("default", "reverttest", List("meta.lock"))(sqlContext
+ .sparkSession)
+ val exception = intercept[RuntimeException] {
+ sql("alter table reverttest rename to revert")
+ }
+ AlterTableUtil.releaseLocks(locks)
+ assert(exception.getMessage == "Alter table rename table operation failed: Table is locked for updation. Please try after some time")
+ }
+
override def afterAll() {
hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
sql("drop table if exists reverttest")
[2/2] incubator-carbondata git commit: [CARBONDATA-957] Fixed table
not found exception in rename table after lock acquire failure.This closes
#814
Posted by gv...@apache.org.
[CARBONDATA-957] Fixed table not found exception in rename table after lock acquire failure.This closes #814
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3de1e631
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3de1e631
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3de1e631
Branch: refs/heads/master
Commit: 3de1e631367f97b9b2bbe0524efee1d64d58b6b2
Parents: 674b71e 4f011e2
Author: Venkata Ramana G <ra...@huawei.com>
Authored: Fri Apr 21 14:01:28 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Fri Apr 21 14:01:28 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/locks/HdfsFileLock.java | 1 +
.../carbondata/core/locks/LocalFileLock.java | 1 +
.../spark/util/GlobalDictionaryUtil.scala | 4 +-
.../execution/command/AlterTableCommands.scala | 40 ++++++-----
.../org/apache/spark/util/AlterTableUtil.scala | 74 +++++++++-----------
.../restructure/AlterTableRevertTestCase.scala | 12 ++++
6 files changed, 73 insertions(+), 59 deletions(-)
----------------------------------------------------------------------