You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/04/02 02:41:35 UTC

[carbondata] 15/41: [CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table and dropping table are at the same time

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch branch-1.5
in repository https://gitbox.apache.org/repos/asf/carbondata.git

commit 9ed81844eb46a965566855a6bd71ac21bf48a12f
Author: qiuchenjian <80...@qq.com>
AuthorDate: Wed Feb 20 17:16:34 2019 +0800

    [CARBONDATA-3297] Fix that the IndexoutOfBoundsException when creating table and dropping table are at the same time
    
    [Problem]
    Throw the IndexoutOfBoundsException when creating table and dropping table are at the same time
    
    [Solution]
    The type of carbonTables in MetaData.class is ArrayBuffer, and the ArrayBuffer is not thread-safe,
    so it throw this exception when creating table and dropping table are at the same time
    
    Use read write lock to guarantee the thread-safe
    
    This closes #3130
---
 .../spark/sql/hive/CarbonFileMetastore.scala       | 37 +++++++++++++++++++---
 1 file changed, 33 insertions(+), 4 deletions(-)

diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index c1be154..ea3bba8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.hive
 
 import java.io.IOException
 import java.net.URI
+import java.util.concurrent.locks.{Lock, ReentrantReadWriteLock}
 
 import scala.collection.mutable.ArrayBuffer
 
@@ -43,7 +44,8 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.schema
-import org.apache.carbondata.core.metadata.schema.{table, SchemaReader}
+import org.apache.carbondata.core.metadata.schema.SchemaReader
+import org.apache.carbondata.core.metadata.schema.table
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
@@ -53,9 +55,16 @@ import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class MetaData(var carbonTables: ArrayBuffer[CarbonTable]) {
+  // use to lock the carbonTables
+  val lock : ReentrantReadWriteLock = new ReentrantReadWriteLock
+  val readLock: Lock = lock.readLock()
+  val writeLock: Lock = lock.writeLock()
+
   // clear the metadata
   def clear(): Unit = {
+    writeLock.lock()
     carbonTables.clear()
+    writeLock.unlock()
   }
 }
 
@@ -192,9 +201,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
    * @return
    */
   def getTableFromMetadataCache(database: String, tableName: String): Option[CarbonTable] = {
-    metadata.carbonTables
+    metadata.readLock.lock()
+    val ret = metadata.carbonTables
       .find(table => table.getDatabaseName.equalsIgnoreCase(database) &&
         table.getTableName.equalsIgnoreCase(tableName))
+    metadata.readLock.unlock()
+    ret
   }
 
   def tableExists(
@@ -270,11 +282,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
       }
     }
 
+
     wrapperTableInfo.map { tableInfo =>
       CarbonMetadata.getInstance().removeTable(tableUniqueName)
       CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+      metadata.writeLock.lock()
       metadata.carbonTables += carbonTable
+      metadata.writeLock.unlock()
       carbonTable
     }
   }
@@ -413,8 +428,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo)
+    metadata.writeLock.lock()
     metadata.carbonTables +=
       CarbonMetadata.getInstance().getCarbonTable(identifier.getTableUniqueName)
+    metadata.writeLock.unlock()
+    metadata.carbonTables
   }
 
   /**
@@ -427,7 +445,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val carbonTableToBeRemoved: Option[CarbonTable] = getTableFromMetadataCache(dbName, tableName)
     carbonTableToBeRemoved match {
       case Some(carbonTable) =>
+        metadata.writeLock.lock()
         metadata.carbonTables -= carbonTable
+        metadata.writeLock.unlock()
       case None =>
         if (LOGGER.isDebugEnabled) {
           LOGGER.debug(s"No entry for table $tableName in database $dbName")
@@ -443,10 +463,12 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
       wrapperTableInfo.getTableUniqueName)
     for (i <- metadata.carbonTables.indices) {
+      metadata.writeLock.lock()
       if (wrapperTableInfo.getTableUniqueName.equals(
         metadata.carbonTables(i).getTableUniqueName)) {
         metadata.carbonTables(i) = carbonTable
       }
+      metadata.writeLock.unlock()
     }
   }
 
@@ -579,12 +601,14 @@ class CarbonFileMetastore extends CarbonMetaStore {
         FileFactory.getCarbonFile(timestampFile, timestampFileType).getLastModifiedTime
       if (!(lastModifiedTime ==
             tableModifiedTimeStore.get(CarbonCommonConstants.DATABASE_DEFAULT_NAME))) {
+        metadata.writeLock.lock()
         metadata.carbonTables = metadata.carbonTables.filterNot(
           table => table.getTableName.equalsIgnoreCase(tableIdentifier.table) &&
                    table.getDatabaseName
                      .equalsIgnoreCase(tableIdentifier.database
                        .getOrElse(SparkSession.getActiveSession.get.sessionState.catalog
                          .getCurrentDatabase)))
+        metadata.writeLock.unlock()
         updateSchemasUpdatedTime(lastModifiedTime)
         isRefreshed = true
       }
@@ -594,8 +618,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   override def isReadFromHiveMetaStore: Boolean = false
 
-  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] =
-    metadata.carbonTables
+  override def listAllTables(sparkSession: SparkSession): Seq[CarbonTable] = {
+    metadata.readLock.lock
+    val ret = metadata.carbonTables.clone()
+    metadata.readLock.unlock
+    ret
+  }
+
 
   override def getThriftTableInfo(carbonTable: CarbonTable): TableInfo = {
     val tableMetadataFile = CarbonTablePath.getSchemaFilePath(carbonTable.getTablePath)