You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:35 UTC
[carbondata] 15/41: [CARBONDATA-3297] Fix that the
IndexoutOfBoundsException when creating table and dropping table are at the
same time
This is an automated email from the ASF dual-hosted git repository.
ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git
commit 9ed81844eb46a965566855a6bd71ac21bf48a12f
Author: qiuchenjian <80...@qq.com>
AuthorDate: Wed Feb 20 17:16:34 2019 +0800
[CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table and dropping table are at the same time
[Problem]
Throw the IndexoutOfBoundsException when creating table and dropping table are at the same time
[Solution]
The type of carbonTables in MetaData.class is ArrayBuffer, and the ArrayBuffer is not thread-safe,
so it throw this exception when creating table and dropping table are at the same time
Use read write lock to guarantee the thread-safe
This closes #3130
---
.../spark/sql/hive/CarbonFileMetastore.scala | 37 +++++++++++++++++++---
1 file changed, 33 insertions(+), 4 deletions(-)
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c1be154..ea3bba8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
import java.io.IOException
import java.net.URI
+import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
import scala.collection.mutable.ArrayBuffer
@@ -43,7 +44,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -53,9 +55,16 @@ import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
import org.apache.carbondata.spark.util.CarbonSparkUtil
case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
+ // use to lock the carbonTables
+ val lock : ReentrantReadWriteLock = new ReentrantReadWriteLock
+ val readLock: Lock = lock.readLock()
+ val writeLock: Lock = lock.writeLock()
+
// clear the metadata
def clear(): Unit = {
+ writeLock.lock()
carbonTables.clear()
+ writeLock.unlock()
}
}
@@ -192,9 +201,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
* @return
*/
def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
- metadata.carbonTables
+ metadata.readLock.lock()
+ val ret = metadata.carbonTables
.find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
table.getTableName.equalsIgnoreCase(tableName))
+ metadata.readLock.unlock()
+ ret
}
def tableExists(
@@ -270,11 +282,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
}
}
+
wrapperTableInfo.map { tableInfo =>
CarbonMetadata.getInstance().removeTable(tableUniqueName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+ metadata.writeLock.lock()
metadata.carbonTables += carbonTable
+ metadata.writeLock.unlock()
carbonTable
}
}
@@ -413,8 +428,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+ metadata.writeLock.lock()
metadata.carbonTables +=
CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
+ metadata.writeLock.unlock()
+ metadata.carbonTables
}
/**
@@ -427,7 +445,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
carbonTableToBeRemoved match {
case Some(carbonTable) =>
+ metadata.writeLock.lock()
metadata.carbonTables -= carbonTable
+ metadata.writeLock.unlock()
case None =>
if (LOGGER.isDebugEnabled) {
LOGGER.debug(s"No entry for table $tableName in database $dbName")
@@ -443,10 +463,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
wrapperTableInfo.getTableUniqueName)
for (i <- metadata.carbonTables.indices) {
+ metadata.writeLock.lock()
if (wrapperTableInfo.getTableUniqueName.equals(
metadata.carbonTables(i).getTableUniqueName)) {
metadata.carbonTables(i) = carbonTable
}
+ metadata.writeLock.unlock()
}
}
@@ -579,12 +601,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
if (!(lastModifiedTime ==
tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+ metadata.writeLock.lock()
metadata.carbonTables = metadata.carbonTables.filterNot(
table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
table.getDatabaseName
.equalsIgnoreCase(tableIdentifier.database
.getOrElse(SparkSession.getActiveSession.get.sessionState.catalog
.getCurrentDatabase)))
+ metadata.writeLock.unlock()
updateSchemasUpdatedTime(lastModifiedTime)
isRefreshed = true
}
@@ -594,8 +618,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
override def isReadFromHiveMetaStore: Boolean = false
- override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
- metadata.carbonTables
+ override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
+ metadata.readLock.lock
+ val ret = metadata.carbonTables.clone()
+ metadata.readLock.unlock
+ ret
+ }
+
override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)