You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2017/07/10 14:03:06 UTC
carbondata git commit: [CARBONDATA-1229] acquired meta.lock during
table drop
Repository: carbondata
Updated Branches:
refs/heads/master 619f1f954 -> 403c3d9b4
[CARBONDATA-1229] acquired meta.lock during table drop
This closes #1153
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/403c3d9b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/403c3d9b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/403c3d9b
Branch: refs/heads/master
Commit: 403c3d9b41e166311ac45ec33b375cbecc8c4741
Parents: 619f1f9
Author: kunalkapoor <ku...@gmail.com>
Authored: Mon Jul 10 12:12:10 2017 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Mon Jul 10 19:32:43 2017 +0530
----------------------------------------------------------------------
.../carbondata/core/locks/CarbonLockUtil.java | 24 +++++++++
.../execution/command/carbonTableSchema.scala | 52 +++++++++-----------
.../org/apache/spark/util/AlterTableUtil.scala | 25 +---------
3 files changed, 50 insertions(+), 51 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
index fba03a1..eaaaf94 100644
--- a/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/locks/CarbonLockUtil.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.locks;
import org.apache.carbondata.common.logging.LogService;
import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
/**
* This class contains all carbon lock utilities
@@ -60,4 +61,27 @@ public class CarbonLockUtil {
}
}
}
+
+ /**
+ * Given a lock type this method will return a new lock object if not acquired by any other
+ * operation
+ *
+ * @param carbonTable
+ * @param lockType
+ * @return
+ */
+ public static ICarbonLock getLockObject(CarbonTable carbonTable,
+ String lockType) {
+ ICarbonLock carbonLock = CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier().getCarbonTableIdentifier(),
+ lockType);
+ LOGGER.info("Trying to acquire lock: " + carbonLock);
+ if (carbonLock.lockWithRetries()) {
+ LOGGER.info("Successfully acquired the lock " + carbonLock);
+ } else {
+ throw new RuntimeException("Table is locked for updation. Please try after some time");
+ }
+ return carbonLock;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 8e7db45..2e5812c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -17,9 +17,8 @@
package org.apache.spark.sql.execution.command
-import java.io.File
-
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
import scala.language.implicitConversions
import org.apache.commons.lang3.StringUtils
@@ -30,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
-import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation, HiveExternalCatalog}
+import org.apache.spark.sql.hive.{CarbonMetastore, CarbonRelation}
import org.apache.spark.util.FileUtils
import org.codehaus.jackson.map.ObjectMapper
@@ -41,10 +40,10 @@ import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
import org.apache.carbondata.core.datastore.impl.FileFactory
import org.apache.carbondata.core.dictionary.server.DictionaryServer
-import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.encoder.Encoding
-import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo}
+import org.apache.carbondata.core.metadata.schema.table.TableInfo
import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension
import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum}
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
@@ -834,24 +833,17 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
val dbName = getDB.getDatabaseName(databaseNameOp, sparkSession)
val identifier = TableIdentifier(tableName, Option(dbName))
val carbonTableIdentifier = new CarbonTableIdentifier(dbName, tableName, "")
- val carbonLock = CarbonLockFactory.getCarbonLockObj(carbonTableIdentifier.getDatabaseName,
- carbonTableIdentifier.getTableName + CarbonCommonConstants.UNDERSCORE +
- LockUsage.DROP_TABLE_LOCK)
+ val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
val storePath = catalog.storePath
- var isLocked = false
catalog.checkSchemasModifiedTimeAndReloadTables()
+ val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
try {
- isLocked = carbonLock.lockWithRetries()
- if (isLocked) {
- logInfo("Successfully able to get the lock for drop.")
- }
- else {
- LOGGER.audit(s"Dropping table $dbName.$tableName failed as the Table is locked")
- sys.error("Table is locked for deletion. Please try after some time")
+ val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
+ locksToBeAcquired foreach {
+ lock => carbonLocks += CarbonLockUtil.getLockObject(carbonTable, lock)
}
LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
- val carbonTable = catalog.getTableFromMetadata(dbName, tableName).map(_.carbonTable).orNull
if (null != carbonTable) {
// clear driver B-tree and dictionary cache
ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
@@ -859,18 +851,22 @@ case class CarbonDropTableCommand(ifExistsSet: Boolean,
CarbonEnv.getInstance(sparkSession).carbonMetastore
.dropTable(storePath, identifier)(sparkSession)
LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+ } catch {
+ case ex: Exception =>
+ LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
} finally {
- if (carbonLock != null && isLocked) {
- if (carbonLock.unlock()) {
- logInfo("Table MetaData Unlocked Successfully after dropping the table")
- // deleting any remaining files.
- val metadataFilePath = CarbonStorePath
- .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
- val fileType = FileFactory.getFileType(metadataFilePath)
- if (FileFactory.isFileExist(metadataFilePath, fileType)) {
- val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
- CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
- }
+ if (carbonLocks.nonEmpty) {
+ val unlocked = carbonLocks.forall(_.unlock())
+ if (unlocked) {
+ logInfo("Table MetaData Unlocked Successfully")
+ }
+ // deleting any remaining files.
+ val metadataFilePath = CarbonStorePath
+ .getCarbonTablePath(storePath, carbonTableIdentifier).getMetadataDirectoryPath
+ val fileType = FileFactory.getFileType(metadataFilePath)
+ if (FileFactory.isFileExist(metadataFilePath, fileType)) {
+ val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
+ CarbonUtil.deleteFoldersAndFiles(file.getParentFile)
}
}
}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/403c3d9b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 9e402cd..87717fb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock}
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock}
import org.apache.carbondata.core.metadata.CarbonTableIdentifier
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.path.CarbonStorePath
@@ -65,7 +65,7 @@ object AlterTableUtil {
val acquiredLocks = ListBuffer[ICarbonLock]()
try {
locksToBeAcquired.foreach { lock =>
- acquiredLocks += getLockObject(table, lock)
+ acquiredLocks += CarbonLockUtil.getLockObject(table, lock)
}
acquiredLocks.toList
} catch {
@@ -76,27 +76,6 @@ object AlterTableUtil {
}
/**
- * Given a lock type this method will return a new lock object if not acquired by any other
- * operation
- *
- * @param carbonTable
- * @param lockType
- * @return
- */
- private def getLockObject(carbonTable: CarbonTable,
- lockType: String): ICarbonLock = {
- val carbonLock = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
- lockType)
- if (carbonLock.lockWithRetries()) {
- LOGGER.info(s"Successfully acquired the lock $lockType")
- } else {
- sys.error("Table is locked for updation. Please try after some time")
- }
- carbonLock
- }
-
- /**
* This method will release the locks acquired for an operation
*
* @param locks