You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2019/05/08 07:11:51 UTC

[carbondata] branch master updated: [CARBONDATA-3369] Fix issues during concurrent execution of Create table If not exists

This is an automated email from the ASF dual-hosted git repository.

ravipesala pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 157de1d  [CARBONDATA-3369] Fix issues during concurrent execution of Create table If not exists
157de1d is described below

commit 157de1d962b2a56375ba4d2c1fb8751a30365a3b
Author: KanakaKumar <ka...@huawei.com>
AuthorDate: Fri May 3 22:05:16 2019 +0530

    [CARBONDATA-3369] Fix issues during concurrent execution of Create table If not exists
    
    Create table if not exists has following problems if run concurrently from different drivers
    Sometimes It fails with error Table <db.table> already exists.
    Create table failed driver still holds the table with wrong path or schema. Eventual operations refer the wrong path
    Stale path created during create table is not deleted for ever [After 1.5.0 version table will be created in a new folder using UUID if folder with table name already exists]
    This PR fixes above 3 issues.
    
    This closes #3198
---
 .../createTable/TestCreateTableIfNotExists.scala   | 36 ++++++++++++++++++++++
 .../command/table/CarbonCreateTableCommand.scala   | 33 +++++++++++++++++++-
 2 files changed, 68 insertions(+), 1 deletion(-)

diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
index 8f7afe4..dc54127 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableIfNotExists.scala
@@ -17,6 +17,8 @@
 
 package org.apache.carbondata.spark.testsuite.createTable
 
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future, TimeUnit}
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -51,11 +53,45 @@ class TestCreateTableIfNotExists extends QueryTest with BeforeAndAfterAll {
     assert(exception.getMessage.contains("Operation not allowed, when source table is carbon table"))
   }
 
+  test("test create table if not exist concurrently") {
+
+    val executorService: ExecutorService = Executors.newFixedThreadPool(10)
+    var futures: List[Future[_]] = List()
+    for (i <- 0 until (3)) {
+      futures = futures :+ runAsync()
+    }
+
+    executorService.shutdown();
+    executorService.awaitTermination(30L, TimeUnit.SECONDS)
+
+    futures.foreach { future =>
+      assertResult("PASS")(future.get.toString)
+    }
+
+    def runAsync(): Future[String] = {
+      executorService.submit(new Callable[String] {
+        override def call() = {
+          // Create table
+          var result = "PASS"
+          try {
+            sql("create table IF NOT EXISTS TestIfExists(name string) stored by 'carbondata'")
+          } catch {
+            case exception: Exception =>
+              result = exception.getMessage
+          }
+          result
+        }
+      })
+    }
+  }
+
+
   override def afterAll {
     sql("use default")
     sql("drop table if exists test")
     sql("drop table if exists sourceTable")
     sql("drop table if exists targetTable")
+    sql("drop table if exists TestIfExists")
   }
 
 }
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 1e17ffe..debb283 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.MetadataCommand
 
@@ -166,7 +167,37 @@ case class CarbonCreateTableCommand(
              """.stripMargin)
           }
         } catch {
-          case e: AnalysisException => throw e
+          case e: AnalysisException =>
+            // AnalysisException thrown with table already exists msg incase of conurrent drivers
+            if (e.getMessage().contains("already exists")) {
+
+              // Clear the cache first
+              CarbonEnv.getInstance(sparkSession).carbonMetaStore
+                .removeTableFromMetadata(dbName, tableName)
+
+              // Delete the folders created by this call if the actual path is different
+              val actualPath = CarbonEnv
+                .getCarbonTable(TableIdentifier(tableName, Option(dbName)))(sparkSession)
+                .getTablePath
+
+              if (!actualPath.equalsIgnoreCase(tablePath)) {
+                LOGGER
+                  .error(
+                    "TableAlreadyExists with path : " + actualPath + " So, deleting " + tablePath)
+                FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(tablePath))
+              }
+
+              // No need to throw for create if not exists
+              if (ifNotExistsSet) {
+                LOGGER.error(e, e)
+              } else {
+                throw e
+              }
+            }
+            else {
+              throw e
+            }
+
           case e: Exception =>
             // call the drop table to delete the created table.
             try {