You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:44 UTC

[37/50] [abbrv] carbondata git commit: [CARBONDATA-2104] Add testcase for concurrent execution of insert overwrite and other command

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index f38304e..13d6274 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -22,8 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
-import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
-import org.apache.spark.sql.util.CarbonException
+import org.apache.spark.sql.execution.command.MetadataCommand
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -79,7 +78,7 @@ case class CarbonCreateTableCommand(
       }
 
       if (tableInfo.getFactTable.getListOfColumns.size <= 0) {
-        CarbonException.analysisException("Table should have at least one column.")
+        throwMetadataException(dbName, tableName, "Table should have at least one column.")
       }
 
       val operationContext = new OperationContext
@@ -125,7 +124,7 @@ case class CarbonCreateTableCommand(
             val msg = s"Create table'$tableName' in database '$dbName' failed"
             LOGGER.audit(msg.concat(", ").concat(e.getMessage))
             LOGGER.error(e, msg)
-            CarbonException.analysisException(msg.concat(", ").concat(e.getMessage))
+            throwMetadataException(dbName, tableName, msg)
         }
       }
       val createTablePostExecutionEvent: CreateTablePostExecutionEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9c0eb57..7c895ab 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -20,11 +20,10 @@ package org.apache.spark.sql.execution.command.table
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
-import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
@@ -34,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
+import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException}
 
 case class CarbonDropTableCommand(
     ifExistsSet: Boolean,
@@ -55,8 +55,11 @@ case class CarbonDropTableCommand(
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
       }
-      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+      if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
+      }
+      LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
       if (carbonTable.isStreamingTable) {
         // streaming table should acquire streaming.lock
         carbonLocks += CarbonLockUtil.getLockObject(identifier, LockUsage.STREAMING_LOCK)
@@ -65,8 +68,9 @@ case class CarbonDropTableCommand(
       if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
         if (!dropChildTable) {
           if (!ifExistsSet) {
-            throw new Exception("Child table which is associated with datamap cannot " +
-                                "be dropped, use DROP DATAMAP command to drop")
+            throwMetadataException(dbName, tableName,
+              "Child table which is associated with datamap cannot be dropped, " +
+              "use DROP DATAMAP command to drop")
           } else {
             return Seq.empty
           }
@@ -79,10 +83,7 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-      if (SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)) {
-        throw new AnalysisException(s"Data loading is in progress for table $tableName, drop " +
-                                    s"table operation is not allowed")
-      }
+
       CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
 
       if (carbonTable.hasDataMapSchema) {
@@ -122,10 +123,12 @@ case class CarbonDropTableCommand(
         if (!ifExistsSet) {
           throw ex
         }
+      case ex: ConcurrentOperationException =>
+        throw ex
       case ex: Exception =>
-        LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
-        CarbonException.analysisException(
-          s"Dropping table $dbName.$tableName failed: ${ ex.getMessage }")
+        val msg = s"Dropping table $dbName.$tableName failed: ${ex.getMessage}"
+        LOGGER.error(ex, msg)
+        throwMetadataException(dbName, tableName, msg)
     } finally {
       if (carbonLocks.nonEmpty) {
         val unlocked = carbonLocks.forall(_.unlock())

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
index 44204d4..e1e41dc 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala
@@ -34,7 +34,8 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus}
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
+import org.apache.carbondata.streaming.CarbonStreamException
 
 class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
 
@@ -201,13 +202,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       val future = pool.submit(thread2)
       Thread.sleep(1000)
       thread1.interrupt()
-      try {
+      val msg = intercept[Exception] {
         future.get()
-        assert(false)
-      } catch {
-        case ex =>
-          assert(ex.getMessage.contains("is not a streaming table"))
       }
+      assert(msg.getMessage.contains("is not a streaming table"))
     } finally {
       if (server != null) {
         server.close()
@@ -655,10 +653,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll {
       thread1.start()
       thread2.start()
       Thread.sleep(1000)
-      val msg = intercept[Exception] {
+      val msg = intercept[ProcessMetaDataException] {
         sql(s"drop table streaming.stream_table_drop")
       }
-      assertResult("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time;")(msg.getMessage)
+      assert(msg.getMessage.contains("Dropping table streaming.stream_table_drop failed: Acquire table lock failed after retry, please try after some time"))
       thread1.interrupt()
       thread2.interrupt()
     } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
index 389f2cd..fe7df23 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/register/TestRegisterCarbonTable.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 /**
  *
@@ -150,7 +151,7 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll {
     sql("drop table carbontable")
     if (!CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.isReadFromHiveMetaStore) {
       restoreData(dblocation, "carbontable")
-      intercept[AnalysisException] {
+      intercept[ProcessMetaDataException] {
         sql("refresh table carbontable")
       }
       restoreData(dblocation, "carbontable_preagg1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
index 9a6efbe..4d5f88c 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala
@@ -24,7 +24,9 @@ import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.apache.spark.sql.test.TestQueryExecutor
 import org.apache.spark.util.AlterTableUtil
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
 
@@ -38,7 +40,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to revert new added columns on failure") {
-    intercept[RuntimeException] {
+    intercept[ProcessMetaDataException] {
       hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
@@ -51,7 +53,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to revert table name on failure") {
-    val exception = intercept[RuntimeException] {
+    val exception = intercept[ProcessMetaDataException] {
       new File(TestQueryExecutor.warehouse + "/reverttest_fail").mkdir()
       sql("alter table reverttest rename to reverttest_fail")
       new File(TestQueryExecutor.warehouse + "/reverttest_fail").delete()
@@ -62,7 +64,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to revert drop columns on failure") {
-    intercept[Exception] {
+    intercept[ProcessMetaDataException] {
       hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest drop columns(decimalField)")
       hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
@@ -71,7 +73,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to revert changed datatype on failure") {
-    intercept[Exception] {
+    intercept[ProcessMetaDataException] {
       hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql("Alter table reverttest change intField intfield bigint")
       hiveClient.runSqlHive("set hive.security.authorization.enabled=false")
@@ -81,7 +83,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
   }
 
   test("test to check if dictionary files are deleted for new column if query fails") {
-    intercept[RuntimeException] {
+    intercept[ProcessMetaDataException] {
       hiveClient.runSqlHive("set hive.security.authorization.enabled=true")
       sql(
         "Alter table reverttest add columns(newField string) TBLPROPERTIES" +
@@ -100,11 +102,12 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     val locks = AlterTableUtil
       .validateTableAndAcquireLock("default", "reverttest", List("meta.lock"))(sqlContext
         .sparkSession)
-    val exception = intercept[RuntimeException] {
+    val exception = intercept[ProcessMetaDataException] {
       sql("alter table reverttest rename to revert")
     }
     AlterTableUtil.releaseLocks(locks)
-    assert(exception.getMessage == "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time")
+    assert(exception.getMessage.contains(
+      "Alter table rename table operation failed: Acquire table lock failed after retry, please try after some time"))
   }
 
   override def afterAll() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index e89efdb..c88302d 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -27,6 +27,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAll {
 
@@ -337,13 +338,13 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
     checkExistence(sql("desc restructure"), true, "intfield", "bigint")
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(11,3)")
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,3)")
-    intercept[RuntimeException] {
+    intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(12,3)")
     }
-    intercept[RuntimeException] {
+    intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,1)")
     }
-    intercept[RuntimeException] {
+    intercept[ProcessMetaDataException] {
       sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,5)")
     }
     sql("alter table default.restructure change decimalfield deciMalfield Decimal(13,4)")
@@ -516,10 +517,10 @@ class AlterTableValidationTestCase extends Spark2QueryTest with BeforeAndAfterAl
     sql(
       "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" +
       " a,sum(b) from PreAggMain group by a")
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preAggmain_preagg1 rename to preagg2")
     }.getMessage.contains("Rename operation for pre-aggregate table is not supported."))
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain rename to preaggmain_new")
     }.getMessage.contains("Rename operation is not supported for table with pre-aggregate tables"))
     sql("drop table if exists preaggMain")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
index d36dd26..ac10b9a 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala
@@ -28,7 +28,7 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, ProcessMetaDataException}
 
 class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 
@@ -649,7 +649,7 @@ class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
     sql(
       "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" +
       " a,sum(b) from PreAggMain group by a")
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain_preagg1 add columns(d string)")
     }.getMessage.contains("Cannot add columns"))
     sql("drop table if exists preaggMain")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
index 0124716..f92d613 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/ChangeDataTypeTestCases.scala
@@ -23,6 +23,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
 class ChangeDataTypeTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
@@ -154,10 +156,10 @@ class ChangeDataTypeTestCases extends Spark2QueryTest with BeforeAndAfterAll {
     sql(
       "create datamap preagg1 on table PreAggMain using 'preaggregate' as select" +
       " a,sum(b) from PreAggMain group by a")
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain change a a long").show
     }.getMessage.contains("exists in a pre-aggregate table"))
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain_preagg1 change a a long").show
     }.getMessage.contains("Cannot change data type for columns in pre-aggregate table"))
     sql("drop table if exists preaggMain")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
index 662d9d8..58c4821 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/DropColumnTestCases.scala
@@ -21,6 +21,8 @@ import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.Spark2QueryTest
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
 class DropColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
 
   override def beforeAll {
@@ -103,7 +105,7 @@ class DropColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll {
       " a,sum(b) from PreAggMain group by a")
     sql("alter table preaggmain drop columns(c)")
 //    checkExistence(sql("desc table preaggmain"), false, "c")
-    assert(intercept[RuntimeException] {
+    assert(intercept[ProcessMetaDataException] {
       sql("alter table preaggmain_preagg1 drop columns(preaggmain_b_sum)").show
     }.getMessage.contains("Cannot drop columns in pre-aggreagate table"))
     sql("drop table if exists preaggMain")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 3a83427..00f13a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -221,12 +221,12 @@ public final class CarbonLoaderUtil {
           // is triggered
           for (LoadMetadataDetails entry : listOfLoadFolderDetails) {
             if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
-                && segmentStatusManager.checkIfValidLoadInProgress(
+                && SegmentStatusManager.isLoadInProgress(
                     absoluteTableIdentifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert overwrite is in progress");
             } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
                 && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS
-                && segmentStatusManager.checkIfValidLoadInProgress(
+                && SegmentStatusManager.isLoadInProgress(
                     absoluteTableIdentifier, entry.getLoadName())) {
               throw new RuntimeException("Already insert into or load is in progress");
             }