You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/02/03 19:43:45 UTC

[38/50] [abbrv] carbondata git commit: [CARBONDATA-2104] Add testcase for concurrent execution of insert overwrite and other command

[CARBONDATA-2104] Add testcase for concurrent execution of insert overwrite and other command

More testcases are added for concurrent execution of insert overwrite and other commands.
Fix bug of delete segment, clean file when insert overwrite in progress
Change in all command processMetadata to throw ProcessMetadataException instead of sys.error

This closes #1891


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/55bffbe2
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/55bffbe2
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/55bffbe2

Branch: refs/heads/branch-1.3
Commit: 55bffbe2dbe880bb2f7a1e51cf02d49e828098d9
Parents: 91911af
Author: Jacky Li <ja...@qq.com>
Authored: Wed Jan 31 13:10:11 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Sat Feb 3 17:24:47 2018 +0530

----------------------------------------------------------------------
 .../statusmanager/SegmentStatusManager.java     |  59 ++--
 .../sdv/register/TestRegisterCarbonTable.scala  |   7 +-
 .../preaggregate/TestPreAggCreateCommand.scala  |   7 +-
 .../TestLoadTableConcurrentScenario.scala       |  78 -----
 .../iud/InsertOverwriteConcurrentTest.scala     | 204 -------------
 .../TestInsertAndOtherCommandConcurrent.scala   | 304 +++++++++++++++++++
 .../partition/TestShowPartitions.scala          |   9 +-
 .../StandardPartitionTableQueryTestCase.scala   |   4 +-
 .../command/CarbonTableSchemaCommonSuite.scala  |  11 +-
 .../exception/ConcurrentOperationException.java |  28 +-
 .../exception/ProcessMetaDataException.java     |  26 ++
 .../datamap/CarbonDropDataMapCommand.scala      |   2 +-
 .../CarbonAlterTableCompactionCommand.scala     |  15 +-
 .../CarbonAlterTableFinishStreaming.scala       |   4 +-
 .../management/CarbonCleanFilesCommand.scala    |   7 +
 .../CarbonDeleteLoadByIdCommand.scala           |   9 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   8 +
 .../management/RefreshCarbonTableCommand.scala  |   3 +-
 .../CarbonProjectForDeleteCommand.scala         |   8 +-
 .../CarbonProjectForUpdateCommand.scala         |   8 +-
 .../spark/sql/execution/command/package.scala   |   6 +
 ...rbonAlterTableDropHivePartitionCommand.scala |   2 +-
 .../CarbonAlterTableDropPartitionCommand.scala  |  16 +-
 .../CarbonAlterTableSplitPartitionCommand.scala |  11 +-
 .../CarbonShowCarbonPartitionsCommand.scala     |   7 +-
 .../CarbonAlterTableAddColumnCommand.scala      |   3 +-
 .../CarbonAlterTableDataTypeChangeCommand.scala |  15 +-
 .../CarbonAlterTableDropColumnCommand.scala     |  13 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |  16 +-
 .../schema/CarbonAlterTableUnsetCommand.scala   |   4 +-
 .../table/CarbonCreateTableCommand.scala        |   7 +-
 .../command/table/CarbonDropTableCommand.scala  |  27 +-
 .../TestStreamingTableOperation.scala           |  14 +-
 .../register/TestRegisterCarbonTable.scala      |   3 +-
 .../restructure/AlterTableRevertTestCase.scala  |  17 +-
 .../AlterTableValidationTestCase.scala          |  11 +-
 .../vectorreader/AddColumnTestCases.scala       |   4 +-
 .../vectorreader/ChangeDataTypeTestCases.scala  |   6 +-
 .../vectorreader/DropColumnTestCases.scala      |   4 +-
 .../processing/util/CarbonLoaderUtil.java       |   4 +-
 40 files changed, 532 insertions(+), 459 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 01f810e..9d14c62 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -509,13 +509,13 @@ public class SegmentStatusManager {
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
           } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus
-              && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) {
+              && isLoadInProgress(absoluteTableIdentifier, loadId)) {
             // if the segment status is in progress then no need to delete that.
             LOG.error("Cannot delete the segment " + loadId + " which is load in progress");
             invalidLoadIds.add(loadId);
             return invalidLoadIds;
           } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
-              && checkIfValidLoadInProgress(absoluteTableIdentifier, loadId)) {
+              && isLoadInProgress(absoluteTableIdentifier, loadId)) {
             // if the segment status is overwrite in progress, then no need to delete that.
             LOG.error("Cannot delete the segment " + loadId + " which is load overwrite " +
                     "in progress");
@@ -572,12 +572,12 @@ public class SegmentStatusManager {
         } else if (SegmentStatus.STREAMING == segmentStatus) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment is streaming in progress.");
-        } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && checkIfValidLoadInProgress(
+        } else if (SegmentStatus.INSERT_IN_PROGRESS == segmentStatus && isLoadInProgress(
             absoluteTableIdentifier, loadMetadata.getLoadName())) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment is insert in progress.");
         } else if (SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS == segmentStatus
-            && checkIfValidLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName())) {
+            && isLoadInProgress(absoluteTableIdentifier, loadMetadata.getLoadName())) {
           LOG.info("Ignoring the segment : " + loadMetadata.getLoadName()
               + "as the segment is insert overwrite in progress.");
         } else if (SegmentStatus.MARKED_FOR_DELETE != segmentStatus) {
@@ -714,21 +714,23 @@ public class SegmentStatusManager {
   }
 
   /**
-   * This function checks if any load or insert overwrite is in progress before dropping that table
-   * @return
+   * Return true if any load or insert overwrite is in progress for specified table
    */
-  public static Boolean checkIfAnyLoadInProgressForTable(CarbonTable carbonTable) {
-    Boolean loadInProgress = false;
+  public static Boolean isLoadInProgressInTable(CarbonTable carbonTable) {
+    if (carbonTable == null) {
+      return false;
+    }
+    boolean loadInProgress = false;
     String metaPath = carbonTable.getMetaDataFilepath();
     LoadMetadataDetails[] listOfLoadFolderDetailsArray =
               SegmentStatusManager.readLoadMetadata(metaPath);
     if (listOfLoadFolderDetailsArray.length != 0) {
       for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) {
         SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
-        if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
-                segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+        if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS
+            || segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
           loadInProgress =
-              checkIfValidLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
+              isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
                   loaddetail.getLoadName());
         }
       }
@@ -737,15 +739,36 @@ public class SegmentStatusManager {
   }
 
   /**
-   * This method will check for valid IN_PROGRESS segments.
-   * Tries to acquire a lock on the segment and decide on the stale segments
-   * @param absoluteTableIdentifier
-   *
+   * Return true if insert overwrite is in progress for specified table
+   */
+  public static Boolean isOverwriteInProgressInTable(CarbonTable carbonTable) {
+    if (carbonTable == null) {
+      return false;
+    }
+    boolean loadInProgress = false;
+    String metaPath = carbonTable.getMetaDataFilepath();
+    LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+        SegmentStatusManager.readLoadMetadata(metaPath);
+    if (listOfLoadFolderDetailsArray.length != 0) {
+      for (LoadMetadataDetails loaddetail :listOfLoadFolderDetailsArray) {
+        SegmentStatus segmentStatus = loaddetail.getSegmentStatus();
+        if (segmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) {
+          loadInProgress =
+              isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(),
+                  loaddetail.getLoadName());
+        }
+      }
+    }
+    return loadInProgress;
+  }
+
+  /**
+   * Return true if the specified `loadName` is in progress, by checking the load lock.
    */
-  public static Boolean checkIfValidLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier,
-      String loadId) {
+  public static Boolean isLoadInProgress(AbsoluteTableIdentifier absoluteTableIdentifier,
+      String loadName) {
     ICarbonLock segmentLock = CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
-        CarbonTablePath.addSegmentPrefix(loadId) + LockUsage.LOCK);
+        CarbonTablePath.addSegmentPrefix(loadName) + LockUsage.LOCK);
     try {
       return !segmentLock.lockWithRetries(1, 0);
     } finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala
index e3620f7..bf07bd6 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/register/TestRegisterCarbonTable.scala
@@ -26,6 +26,7 @@ import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 /**
  *
@@ -168,12 +169,8 @@ class TestRegisterCarbonTable extends QueryTest with BeforeAndAfterAll {
       backUpData(dbLocationCustom, "carbontable_preagg1")
       sql("drop table carbontable")
       restoreData(dbLocationCustom, "carbontable")
-      try {
+      intercept[ProcessMetaDataException] {
         sql("refresh table carbontable")
-        assert(false)
-      } catch {
-        case e: AnalysisException =>
-          assert(true)
       }
       restoreData(dbLocationCustom, "carbontable_preagg1")
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
index 0cb1045..38ab9ae 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala
@@ -29,7 +29,7 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 
 class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
 
@@ -286,7 +286,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
   test("test pre agg create table 22: using invalid datamap provider") {
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
 
-    val e: Exception = intercept[Exception] {
+    val e: Exception = intercept[MalformedDataMapCommandException] {
       sql(
         """
           | CREATE DATAMAP agg0 ON TABLE mainTable
@@ -296,8 +296,7 @@ class TestPreAggCreateCommand extends QueryTest with BeforeAndAfterAll {
           | GROUP BY column3,column5,column2
         """.stripMargin)
     }
-    assert(e.getMessage.contains(
-      s"Unknown data map type abc"))
+    assert(e.getMessage.contains("Unknown data map type abc"))
     sql("DROP DATAMAP IF EXISTS agg0 ON TABLE maintable")
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
index 6af28c3..e69de29 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/concurrent/TestLoadTableConcurrentScenario.scala
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.concurrent
-
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
-import org.apache.spark.sql.CarbonEnv
-import org.apache.spark.sql.test.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-class TestLoadTableConcurrentScenario extends QueryTest with BeforeAndAfterAll {
-
-  var carbonTable: CarbonTable = _
-  var metaPath: String = _
-
-  override def beforeAll {
-    sql("use default")
-    sql("drop table if exists drop_concur")
-    sql("drop table if exists rename_concur")
-  }
-
-  test("do not allow drop table when load is in progress") {
-    sql("create table drop_concur(id int, name string) stored by 'carbondata'")
-    sql("insert into drop_concur select 1,'abc'")
-    sql("insert into drop_concur select 1,'abc'")
-    sql("insert into drop_concur select 1,'abc'")
-
-    carbonTable = CarbonEnv.getCarbonTable(Option("default"), "drop_concur")(sqlContext.sparkSession)
-    metaPath = carbonTable.getMetaDataFilepath
-    val listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath)
-    listOfLoadFolderDetailsArray(1).setSegmentStatus(SegmentStatus.INSERT_IN_PROGRESS)
-
-    try {
-      sql("drop table drop_concur")
-    } catch {
-      case ex: Throwable => assert(ex.getMessage.contains("Cannot drop table, load or insert overwrite is in progress"))
-    }
-  }
-
-  test("do not allow rename table when load is in progress") {
-    sql("create table rename_concur(id int, name string) stored by 'carbondata'")
-    sql("insert into rename_concur select 1,'abc'")
-    sql("insert into rename_concur select 1,'abc'")
-
-    carbonTable = CarbonEnv.getCarbonTable(Option("default"), "rename_concur")(sqlContext.sparkSession)
-    metaPath = carbonTable.getMetaDataFilepath
-    val listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaPath)
-    listOfLoadFolderDetailsArray(1).setSegmentStatus(SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)
-
-    try {
-      sql("alter table rename_concur rename to rename_concur1")
-    } catch {
-      case ex: Throwable => assert(ex.getMessage.contains("alter rename failed, load, insert or insert overwrite " +
-        "is in progress for the table"))
-    }
-  }
-
-  override def afterAll: Unit = {
-    sql("use default")
-    sql("drop table if exists drop_concur")
-    sql("drop table if exists rename_concur")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
deleted file mode 100644
index 25bdf7b..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/InsertOverwriteConcurrentTest.scala
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.iud
-
-import java.text.SimpleDateFormat
-import java.util
-import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
-
-import scala.collection.JavaConverters._
-
-import org.apache.spark.sql.test.util.QueryTest
-import org.apache.spark.sql.{DataFrame, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
-import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.schema.FilterType
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.events.Event
-import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
-
-class InsertOverwriteConcurrentTest extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
-  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
-  var df: DataFrame = _
-
-  override def beforeAll {
-    dropTable()
-    buildTestData()
-
-    // register hook to the table to sleep, thus the other command will be executed
-    DataMapStoreManager.getInstance().createAndRegisterDataMap(
-      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
-      classOf[WaitingDataMap].getName,
-      "test")
-  }
-
-  private def buildTestData(): Unit = {
-
-    CarbonProperties.getInstance()
-      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
-
-    // Simulate data and write to table orders
-    import sqlContext.implicits._
-
-    val sdf = new SimpleDateFormat("yyyy-MM-dd")
-    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
-      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
-        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
-        "ordersTable" + value))
-      .toDF("o_id", "o_date", "o_country", "o_name",
-        "o_phonetype", "o_serialname", "o_salary", "o_comment")
-    createTable("orders")
-    createTable("orders_overwrite")
-  }
-
-  private def createTable(tableName: String): Unit = {
-    df.write
-      .format("carbondata")
-      .option("tableName", tableName)
-      .option("tempCSV", "false")
-      .mode(SaveMode.Overwrite)
-      .save()
-  }
-
-  override def afterAll {
-    executorService.shutdownNow()
-    dropTable()
-  }
-
-  override def beforeEach(): Unit = {
-    Global.overwriteRunning = false
-  }
-
-  private def dropTable() = {
-    sql("DROP TABLE IF EXISTS orders")
-    sql("DROP TABLE IF EXISTS orders_overwrite")
-  }
-
-  // run the input SQL and block until it is running
-  private def runSqlAsync(sql: String): Future[String] = {
-    assert(!Global.overwriteRunning)
-    var count = 0
-    val future = executorService.submit(
-      new QueryTask(sql)
-    )
-    while (!Global.overwriteRunning && count < 1000) {
-      Thread.sleep(10)
-      // to avoid dead loop in case WaitingDataMap is not invoked
-      count += 1
-    }
-    future
-  }
-
-  test("compaction should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception]{
-      sql("alter table orders compact 'MINOR'")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Cannot run data loading and compaction on same table concurrently"))
-  }
-
-  test("update should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception] {
-      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Cannot run data loading and update on same table concurrently"))
-  }
-
-  test("delete should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception] {
-      sql("delete from orders where o_country='china'").show
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Cannot run data loading and delete on same table concurrently"))
-  }
-
-  test("drop table should fail if insert overwrite is in progress") {
-    val future = runSqlAsync("insert overWrite table orders select * from orders_overwrite")
-    val ex = intercept[Exception] {
-      sql("drop table if exists orders")
-    }
-    assert(future.get.contains("PASS"))
-    assert(ex.getMessage.contains("Data loading is in progress for table orders, drop table operation is not allowed"))
-  }
-
-  class QueryTask(query: String) extends Callable[String] {
-    override def call(): String = {
-      var result = "PASS"
-      try {
-        sql(query).collect()
-      } catch {
-        case exception: Exception => LOGGER.error(exception.getMessage)
-          result = "FAIL"
-      }
-      result
-    }
-  }
-
-}
-
-object Global {
-  var overwriteRunning = false
-}
-
-class WaitingDataMap() extends DataMapFactory {
-
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
-
-  override def fireEvent(event: Event): Unit = ???
-
-  override def clear(segmentId: String): Unit = {}
-
-  override def clear(): Unit = {}
-
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
-
-  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
-
-  override def createWriter(segmentId: String): DataMapWriter = {
-    new DataMapWriter {
-      override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
-
-      override def onBlockletEnd(blockletId: Int): Unit = { }
-
-      override def onBlockEnd(blockId: String): Unit = { }
-
-      override def onBlockletStart(blockletId: Int): Unit = { }
-
-      override def onBlockStart(blockId: String): Unit = {
-        // trigger the second SQL to execute
-        Global.overwriteRunning = true
-
-        // wait for 1 second to let second SQL to finish
-        Thread.sleep(1000)
-      }
-    }
-  }
-
-  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
-
-  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
new file mode 100644
index 0000000..7067ef8
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.iud
+
+import java.text.SimpleDateFormat
+import java.util
+import java.util.concurrent.{Callable, ExecutorService, Executors, Future}
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{DataFrame, SaveMode}
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datastore.page.ColumnPage
+import org.apache.carbondata.core.indexstore.schema.FilterType
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.events.Event
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
+import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory
+
+// This testsuite test insert and insert overwrite with other commands concurrently
+class TestInsertAndOtherCommandConcurrent extends QueryTest with BeforeAndAfterAll with BeforeAndAfterEach {
+  private val executorService: ExecutorService = Executors.newFixedThreadPool(10)
+  var df: DataFrame = _
+
+  override def beforeAll {
+    dropTable()
+    buildTestData()
+
+    // register hook to the table to sleep, thus the other command will be executed
+    DataMapStoreManager.getInstance().createAndRegisterDataMap(
+      AbsoluteTableIdentifier.from(storeLocation + "/orders", "default", "orders"),
+      classOf[WaitingDataMap].getName,
+      "test")
+  }
+
+  private def buildTestData(): Unit = {
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, "yyyy-MM-dd")
+
+    // Simulate data and write to table orders
+    import sqlContext.implicits._
+
+    val sdf = new SimpleDateFormat("yyyy-MM-dd")
+    df = sqlContext.sparkSession.sparkContext.parallelize(1 to 150000)
+      .map(value => (value, new java.sql.Date(sdf.parse("2015-07-" + (value % 10 + 10)).getTime),
+        "china", "aaa" + value, "phone" + 555 * value, "ASD" + (60000 + value), 14999 + value,
+        "ordersTable" + value))
+      .toDF("o_id", "o_date", "o_country", "o_name",
+        "o_phonetype", "o_serialname", "o_salary", "o_comment")
+    createTable("orders")
+    createTable("orders_overwrite")
+  }
+
+  private def createTable(tableName: String): Unit = {
+    df.write
+      .format("carbondata")
+      .option("tableName", tableName)
+      .option("tempCSV", "false")
+      .mode(SaveMode.Overwrite)
+      .save()
+  }
+
+  override def afterAll {
+    executorService.shutdownNow()
+    dropTable()
+  }
+
+  override def beforeEach(): Unit = {
+    Global.overwriteRunning = false
+  }
+
+  private def dropTable() = {
+    sql("DROP TABLE IF EXISTS orders")
+    sql("DROP TABLE IF EXISTS orders_overwrite")
+  }
+
+  // run the input SQL and block until it is running
+  private def runSqlAsync(sql: String): Future[String] = {
+    assert(!Global.overwriteRunning)
+    var count = 0
+    val future = executorService.submit(
+      new QueryTask(sql)
+    )
+    while (!Global.overwriteRunning && count < 1000) {
+      Thread.sleep(10)
+      // to avoid dead loop in case WaitingDataMap is not invoked
+      count += 1
+    }
+    future
+  }
+
+  // ----------- INSERT OVERWRITE --------------
+
+  test("compaction should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException]{
+      sql("alter table orders compact 'MINOR'")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, compaction operation is not allowed"))
+  }
+
+  test("update should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, data update operation is not allowed"))
+  }
+
+  test("delete should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("delete from orders where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, data delete operation is not allowed"))
+  }
+
+  test("drop table should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("drop table if exists orders")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, drop table operation is not allowed"))
+  }
+
+  test("alter rename table should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("alter table orders rename to other")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, alter table rename operation is not allowed"))
+  }
+
+  test("delete segment by id should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("DELETE FROM TABLE orders WHERE SEGMENT.ID IN (0)")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "insert overwrite is in progress for table default.orders, delete segment operation is not allowed"))
+  }
+
+  test("delete segment by date should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("DELETE FROM TABLE orders WHERE SEGMENT.STARTTIME BEFORE '2099-06-01 12:05:06' ")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "insert overwrite is in progress for table default.orders, delete segment operation is not allowed"))
+  }
+
+  test("clean file should fail if insert overwrite is in progress") {
+    val future = runSqlAsync("insert overwrite table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("clean files for table  orders")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "insert overwrite is in progress for table default.orders, clean file operation is not allowed"))
+  }
+
+  // ----------- INSERT  --------------
+
+  test("compaction should fail if insert is in progress") {
+    val future = runSqlAsync("insert into table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException]{
+      sql("alter table orders compact 'MINOR'")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, compaction operation is not allowed"))
+  }
+
+  test("update should fail if insert is in progress") {
+    val future = runSqlAsync("insert into table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("update orders set (o_country)=('newCountry') where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, data update operation is not allowed"))
+  }
+
+  test("delete should fail if insert is in progress") {
+    val future = runSqlAsync("insert into table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("delete from orders where o_country='china'").show
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, data delete operation is not allowed"))
+  }
+
+  test("drop table should fail if insert is in progress") {
+    val future = runSqlAsync("insert into table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("drop table if exists orders")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, drop table operation is not allowed"))
+  }
+
+  test("alter rename table should fail if insert is in progress") {
+    val future = runSqlAsync("insert into table orders select * from orders_overwrite")
+    val ex = intercept[ConcurrentOperationException] {
+      sql("alter table orders rename to other")
+    }
+    assert(future.get.contains("PASS"))
+    assert(ex.getMessage.contains(
+      "loading is in progress for table default.orders, alter table rename operation is not allowed"))
+  }
+
+  class QueryTask(query: String) extends Callable[String] {
+    override def call(): String = {
+      var result = "PASS"
+      try {
+        sql(query).collect()
+      } catch {
+        case exception: Exception => LOGGER.error(exception.getMessage)
+          result = "FAIL"
+      }
+      result
+    }
+  }
+
+}
+
+object Global {
+  var overwriteRunning = false
+}
+
+class WaitingDataMap() extends DataMapFactory {
+
+  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+
+  override def fireEvent(event: Event): Unit = ???
+
+  override def clear(segmentId: String): Unit = {}
+
+  override def clear(): Unit = {}
+
+  override def getDataMaps(distributable: DataMapDistributable): java.util.List[DataMap] = ???
+
+  override def getDataMaps(segmentId: String): util.List[DataMap] = ???
+
+  override def createWriter(segmentId: String): DataMapWriter = {
+    new DataMapWriter {
+      override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
+
+      override def onBlockletEnd(blockletId: Int): Unit = { }
+
+      override def onBlockEnd(blockId: String): Unit = { }
+
+      override def onBlockletStart(blockletId: Int): Unit = { }
+
+      override def onBlockStart(blockId: String): Unit = {
+        // trigger the second SQL to execute
+        Global.overwriteRunning = true
+
+        // wait for 1 second to let second SQL to finish
+        Thread.sleep(1000)
+      }
+    }
+  }
+
+  override def getMeta: DataMapMeta = new DataMapMeta(List("o_country").asJava, FilterType.EQUALTO)
+
+  override def toDistributable(segmentId: String): util.List[DataMapDistributable] = ???
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
index 86305e7..4825968 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestShowPartitions.scala
@@ -25,6 +25,8 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
 
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
 class TestShowPartition  extends QueryTest with BeforeAndAfterAll {
   override def beforeAll = {
 
@@ -136,10 +138,11 @@ class TestShowPartition  extends QueryTest with BeforeAndAfterAll {
   }
 
   test("show partition table: exception when show not partition table") {
-    val errorMessage =
-      intercept[AnalysisException] { sql("show partitions notPartitionTable").show() }
+    val errorMessage = intercept[ProcessMetaDataException] {
+      sql("show partitions notPartitionTable").show()
+    }
     assert(errorMessage.getMessage.contains(
-      "SHOW PARTITIONS is not allowed on a table that is not partitioned: notpartitiontable"))
+      "SHOW PARTITIONS is not allowed on a table that is not partitioned"))
   }
 
   test("show partition table: hash table") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index 0a86dee..95345de 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -276,8 +276,8 @@ test("Creation of partition table should fail if the colname in table schema and
     sql("drop table if exists partitionTable")
     sql("create table partitionTable (id int,city string,age int) partitioned by(name string) stored by 'carbondata'".stripMargin)
     sql(
-      s"""create datamap preaggTable on table partitionTable using 'preaggregate' as select id,sum(age) from partitionTable group by id"""
-        .stripMargin)
+    s"""create datamap preaggTable on table partitionTable using 'preaggregate' as select id,sum(age) from partitionTable group by id"""
+      .stripMargin)
     sql("insert into partitionTable select 1,'Bangalore',30,'John'")
     sql("insert into partitionTable select 2,'Chennai',20,'Huawei'")
     checkAnswer(sql("show partitions partitionTable"), Seq(Row("name=John"),Row("name=Huawei")))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common-test/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
index 67dfa8f..bd02917 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/spark/sql/execution/command/CarbonTableSchemaCommonSuite.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.junit.Assert
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
+
 class CarbonTableSchemaCommonSuite extends QueryTest with BeforeAndAfterAll {
 
   test("Creating table: Duplicate dimensions found with name, it should throw AnalysisException") {
@@ -53,20 +55,15 @@ class CarbonTableSchemaCommonSuite extends QueryTest with BeforeAndAfterAll {
          | STORED BY 'carbondata'
        """.stripMargin)
 
-    try {
+    val ex = intercept[ProcessMetaDataException] {
       sql(
         s"""
            | alter TABLE carbon_table add columns(
            | bb char(10)
             )
        """.stripMargin)
-      Assert.assertTrue(false)
-    } catch {
-      case _: RuntimeException => Assert.assertTrue(true)
-      case _: Exception => Assert.assertTrue(false)
-    } finally {
-      sql("DROP TABLE IF EXISTS carbon_table")
     }
+    sql("DROP TABLE IF EXISTS carbon_table")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
index 1f3c07d..cc0047f 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ConcurrentOperationException.java
@@ -17,28 +17,22 @@
 
 package org.apache.carbondata.spark.exception;
 
-public class ConcurrentOperationException extends Exception {
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 
-  /**
-   * The Error message.
-   */
-  private String msg = "";
+public class ConcurrentOperationException extends MalformedCarbonCommandException {
 
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public ConcurrentOperationException(String msg) {
-    super(msg);
-    this.msg = msg;
+  public ConcurrentOperationException(String dbName, String tableName, String command1,
+      String command2) {
+    super(command1 + " is in progress for table " + dbName + "." + tableName + ", " + command2 +
+      " operation is not allowed");
+  }
+
+  public ConcurrentOperationException(CarbonTable table, String command1, String command2) {
+    this(table.getDatabaseName(), table.getTableName(), command1, command2);
   }
 
-  /**
-   * getMessage
-   */
   public String getMessage() {
-    return this.msg;
+    return super.getMessage();
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
new file mode 100644
index 0000000..3e06bde
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.exception;
+
+// This exception will be thrown when processMetaData failed in
+// Carbon's RunnableCommand
+public class ProcessMetaDataException extends MalformedCarbonCommandException {
+  public ProcessMetaDataException(String dbName, String tableName, String msg) {
+    super("operation failed for " + dbName + "." + tableName + ": " + msg);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 1fa2494..bc55988 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -123,7 +123,7 @@ case class CarbonDropDataMapCommand(
         throw e
       case ex: Exception =>
         LOGGER.error(ex, s"Dropping datamap $dataMapName failed")
-        throw new MalformedCarbonCommandException(
+        throwMetadataException(dbName, tableName,
           s"Dropping datamap $dataMapName failed: ${ex.getMessage}")
     }
     finally {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index 2a77826..667d550 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -83,14 +83,13 @@ case class CarbonAlterTableCompactionCommand(
       val loadMetadataEvent = new LoadMetadataEvent(table, true)
       OperationListenerBus.getInstance().fireEvent(loadMetadataEvent, operationContext)
     }
+    if (SegmentStatusManager.isLoadInProgressInTable(table)) {
+      throw new ConcurrentOperationException(table, "loading", "compaction")
+    }
     Seq.empty
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER: LogService =
-      LogServiceFactory.getLogService(this.getClass.getName)
-    val tableName = alterTableModel.tableName.toLowerCase
-    val databaseName = alterTableModel.dbName.getOrElse(sparkSession.catalog.currentDatabase)
     operationContext.setProperty("compactionException", "true")
     var compactionType: CompactionType = null
     var compactionException = "true"
@@ -111,14 +110,6 @@ case class CarbonAlterTableCompactionCommand(
     } else if (compactionException.equalsIgnoreCase("false")) {
       Seq.empty
     } else {
-      val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table)
-      if (isLoadInProgress) {
-        val message = "Cannot run data loading and compaction on same table concurrently. " +
-                      "Please wait for load to finish"
-        LOGGER.error(message)
-        throw new ConcurrentOperationException(message)
-      }
-
       val carbonLoadModel = new CarbonLoadModel()
       carbonLoadModel.setTableName(table.getTableName)
       val dataLoadSchema = new CarbonDataLoadSchema(table)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
index 59cc0c4..a477167 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableFinishStreaming.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import java.io.IOException
-
 import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.MetadataCommand
 
@@ -46,7 +44,7 @@ case class CarbonAlterTableFinishStreaming(
         val msg = "Failed to finish streaming, because streaming is locked for table " +
                   carbonTable.getDatabaseName() + "." + carbonTable.getTableName()
         LOGGER.error(msg)
-        throw new IOException(msg)
+        throwMetadataException(carbonTable.getDatabaseName, carbonTable.getTableName, msg)
       }
     } finally {
       if (streamingLock.unlock()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 4b68bd0..4f90fb5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -26,7 +26,9 @@ import org.apache.spark.sql.optimizer.CarbonFilters
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 import org.apache.carbondata.spark.util.CommonUtil
 
 /**
@@ -45,6 +47,11 @@ case class CarbonCleanFilesCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName.get)(sparkSession)
+    // if insert overwrite in progress, do not allow delete segment
+    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "clean file")
+    }
+
     val operationContext = new OperationContext
     val cleanFilesPreEvent: CleanFilesPreEvent =
       CleanFilesPreEvent(carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index a2819cc..0861c63 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -21,7 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByIdCommand(
     loadIds: Seq[String],
@@ -32,8 +34,13 @@ case class CarbonDeleteLoadByIdCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val operationContext = new OperationContext
 
+    // if insert overwrite in progress, do not allow delete segment
+    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
+    }
+
+    val operationContext = new OperationContext
     val deleteSegmentByIdPreEvent: DeleteSegmentByIdPreEvent =
       DeleteSegmentByIdPreEvent(carbonTable,
         loadIds,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 490bb58..dcbc6ce 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -21,7 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus}
+import org.apache.carbondata.spark.exception.ConcurrentOperationException
 
 case class CarbonDeleteLoadByLoadDateCommand(
     databaseNameOp: Option[String],
@@ -33,6 +35,12 @@ case class CarbonDeleteLoadByLoadDateCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+
+    // if insert overwrite in progress, do not allow delete segment
+    if (SegmentStatusManager.isOverwriteInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "insert overwrite", "delete segment")
+    }
+
     val operationContext = new OperationContext
     val deleteSegmentByDatePreEvent: DeleteSegmentByDatePreEvent =
       DeleteSegmentByDatePreEvent(carbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
index 2983ea4..72ed051 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/RefreshCarbonTableCommand.scala
@@ -26,7 +26,6 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, MetadataCommand}
 import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand
-import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.datastore.impl.FileFactory
@@ -89,7 +88,7 @@ case class RefreshCarbonTableCommand(
                       s"[$tableName] failed. All the aggregate Tables for table [$tableName] is" +
                       s" not copied under database [$databaseName]"
             LOGGER.audit(msg)
-            CarbonException.analysisException(msg)
+            throwMetadataException(databaseName, tableName, msg)
           }
           // 2.2.1 Register the aggregate tables to hive
           registerAggregates(databaseName, dataMapSchemaList)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 874d416..a37d6dc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -44,12 +44,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)
-    if (isLoadInProgress) {
-      val errorMessage = "Cannot run data loading and delete on same table concurrently. Please " +
-                         "wait for load to finish"
-      LOGGER.error(errorMessage)
-      throw new ConcurrentOperationException(errorMessage)
+    if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "loading", "data delete")
     }
 
     IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 318c904..756d120 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -55,12 +55,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)
-    if (isLoadInProgress) {
-      val errorMessage = "Cannot run data loading and update on same table concurrently. Please " +
-                         "wait for load to finish"
-      LOGGER.error(errorMessage)
-      throw new ConcurrentOperationException(errorMessage)
+    if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+      throw new ConcurrentOperationException(carbonTable, "loading", "data update")
     }
 
     // trigger event for Update table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
index 4e983a2..4224efa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/package.scala
@@ -24,6 +24,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.spark.exception.ProcessMetaDataException
 
 object Checker {
   def validateTableExists(
@@ -45,6 +46,11 @@ object Checker {
  */
 trait MetadataProcessOpeation {
   def processMetadata(sparkSession: SparkSession): Seq[Row]
+
+  // call this to throw exception when processMetadata failed
+  def throwMetadataException(dbName: String, tableName: String, msg: String): Unit = {
+    throw new ProcessMetaDataException(dbName, tableName, msg)
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
index 0158a32..cb4dece 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropHivePartitionCommand.scala
@@ -72,7 +72,7 @@ case class CarbonAlterTableDropHivePartitionCommand(
       } catch {
         case e: Exception =>
           if (!ifExists) {
-            throw e
+            throwMetadataException(table.getDatabaseName, table.getTableName, e.getMessage)
           } else {
             log.warn(e.getMessage)
             return Seq.empty[Row]

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 114c25d..7fe2658 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -38,7 +38,7 @@ import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverte
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -62,17 +62,13 @@ case class CarbonAlterTableDropPartitionCommand(
       .asInstanceOf[CarbonRelation]
     val tablePath = relation.carbonTable.getTablePath
     carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
-    if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
-    }
-    if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
-      LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
-      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+    if (relation == null || CarbonMetadata.getInstance.getCarbonTable(dbName, tableName) == null) {
+      throwMetadataException(dbName, tableName, "table not found")
     }
     val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
     if (partitionInfo == null) {
-      sys.error(s"Table $tableName is not a partition table.")
+      throwMetadataException(dbName, tableName, "table is not a partition table")
     }
     val partitionIds = partitionInfo.getPartitionIds.asScala.map(_.asInstanceOf[Int]).toList
     // keep a copy of partitionIdList before update partitionInfo.
@@ -92,11 +88,11 @@ case class CarbonAlterTableDropPartitionCommand(
         listInfo.remove(listToRemove)
         partitionInfo.setListInfo(listInfo)
       case PartitionType.RANGE_INTERVAL =>
-        sys.error(s"Dropping range interval partition isn't support yet!")
+        throwMetadataException(dbName, tableName,
+          "Dropping range interval partition is unsupported")
     }
     partitionInfo.dropPartition(partitionIndex)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index bafc96a..020a72c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -40,7 +40,7 @@ import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.loading.TableProcessingOperations
 import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
@@ -65,12 +65,12 @@ case class CarbonAlterTableSplitPartitionCommand(
       .asInstanceOf[CarbonRelation]
     val tablePath = relation.carbonTable.getTablePath
     if (relation == null) {
-      sys.error(s"Table $dbName.$tableName does not exist")
+      throwMetadataException(dbName, tableName, "table not found")
     }
     carbonMetaStore.checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
     if (null == CarbonMetadata.getInstance.getCarbonTable(dbName, tableName)) {
       LOGGER.error(s"Alter table failed. table not found: $dbName.$tableName")
-      sys.error(s"Alter table failed. table not found: $dbName.$tableName")
+      throwMetadataException(dbName, tableName, "table not found")
     }
     val table = relation.carbonTable
     val partitionInfo = table.getPartitionInfo(tableName)
@@ -80,16 +80,15 @@ case class CarbonAlterTableSplitPartitionCommand(
     oldPartitionIds.addAll(partitionIds.asJava)
 
     if (partitionInfo == null) {
-      sys.error(s"Table $tableName is not a partition table.")
+      throwMetadataException(dbName, tableName, "Table is not a partition table.")
     }
     if (partitionInfo.getPartitionType == PartitionType.HASH) {
-      sys.error(s"Hash partition table cannot be added or split!")
+      throwMetadataException(dbName, tableName, "Hash partition table cannot be added or split!")
     }
 
     updatePartitionInfo(partitionInfo, partitionIds)
 
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
     // read TableInfo
     val tableInfo = carbonMetaStore.getThriftTableInfo(carbonTablePath)(sparkSession)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
index ed50835..9419b00 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonShowCarbonPartitionsCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.partition
 
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.command.MetadataCommand
@@ -39,12 +39,11 @@ private[sql] case class CarbonShowCarbonPartitionsCommand(
     val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
       .lookupRelation(tableIdentifier)(sparkSession).asInstanceOf[CarbonRelation]
     val carbonTable = relation.carbonTable
-    val tableName = carbonTable.getTableName
     val partitionInfo = carbonTable.getPartitionInfo(
       carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
     if (partitionInfo == null) {
-      throw new AnalysisException(
-        s"SHOW PARTITIONS is not allowed on a table that is not partitioned: $tableName")
+      throwMetadataException(carbonTable.getDatabaseName, carbonTable.getTableName,
+        "SHOW PARTITIONS is not allowed on a table that is not partitioned")
     }
     val partitionType = partitionInfo.getPartitionType
     val columnName = partitionInfo.getColumnSchemaList.get(0).getColumnName

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
index f3f01bb..4b43ea7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableAddColumnCommand.scala
@@ -110,7 +110,8 @@ private[sql] case class CarbonAlterTableAddColumnCommand(
             carbonTable.getAbsoluteTableIdentifier).collect()
           AlterTableUtil.revertAddColumnChanges(dbName, tableName, timeStamp)(sparkSession)
         }
-        sys.error(s"Alter table add operation failed: ${e.getMessage}")
+        throwMetadataException(dbName, tableName,
+          s"Alter table add operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
index 9bea935..571e23f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDataTypeChangeCommand.scala
@@ -62,7 +62,7 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       if (!carbonColumns.exists(_.getColName.equalsIgnoreCase(columnName))) {
         LOGGER.audit(s"Alter table change data type request has failed. " +
                      s"Column $columnName does not exist")
-        sys.error(s"Column does not exist: $columnName")
+        throwMetadataException(dbName, tableName, s"Column does not exist: $columnName")
       }
       val carbonColumn = carbonColumns.filter(_.getColName.equalsIgnoreCase(columnName))
       if (carbonColumn.size == 1) {
@@ -71,11 +71,11 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       } else {
         LOGGER.audit(s"Alter table change data type request has failed. " +
                      s"Column $columnName is invalid")
-        sys.error(s"Invalid Column: $columnName")
+        throwMetadataException(dbName, tableName, s"Invalid Column: $columnName")
       }
       // read the latest schema file
-      val carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
+      val carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier)
       val tableInfo: TableInfo = metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
       // maintain the added column for schema evolution history
       var addColumnSchema: ColumnSchema = null
@@ -107,12 +107,13 @@ private[sql] case class CarbonAlterTableDataTypeChangeCommand(
       LOGGER.info(s"Alter table for data type change is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for data type change is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception => LOGGER
-        .error("Alter table change datatype failed : " + e.getMessage)
+      case e: Exception =>
+        LOGGER.error("Alter table change datatype failed : " + e.getMessage)
         if (carbonTable != null) {
           AlterTableUtil.revertDataTypeChanges(dbName, tableName, timeStamp)(sparkSession)
         }
-        sys.error(s"Alter table data type change operation failed: ${e.getMessage}")
+        throwMetadataException(dbName, tableName,
+          s"Alter table data type change operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
index 0319d9e..780ac8f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableDropColumnCommand.scala
@@ -63,7 +63,7 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           tableColumn => partitionColumnSchemaList.contains(tableColumn)
         }
         if (partitionColumns.nonEmpty) {
-          throw new UnsupportedOperationException("Partition columns cannot be dropped: " +
+          throwMetadataException(dbName, tableName, "Partition columns cannot be dropped: " +
                                                   s"$partitionColumns")
         }
       }
@@ -85,7 +85,8 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
           }
         }
         if (!columnExist) {
-          sys.error(s"Column $column does not exists in the table $dbName.$tableName")
+          throwMetadataException(dbName, tableName,
+            s"Column $column does not exists in the table $dbName.$tableName")
         }
       }
 
@@ -137,13 +138,13 @@ private[sql] case class CarbonAlterTableDropColumnCommand(
       LOGGER.info(s"Alter table for drop columns is successful for table $dbName.$tableName")
       LOGGER.audit(s"Alter table for drop columns is successful for table $dbName.$tableName")
     } catch {
-      case e: Exception => LOGGER
-        .error("Alter table drop columns failed : " + e.getMessage)
+      case e: Exception =>
+        LOGGER.error("Alter table drop columns failed : " + e.getMessage)
         if (carbonTable != null) {
           AlterTableUtil.revertDropColumnChanges(dbName, tableName, timeStamp)(sparkSession)
         }
-        e.printStackTrace()
-        sys.error(s"Alter table drop column operation failed: ${e.getMessage}")
+        throwMetadataException(dbName, tableName,
+          s"Alter table drop column operation failed: ${e.getMessage}")
     } finally {
       // release lock after command execution completion
       AlterTableUtil.releaseLocks(locks)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index dd34f08..c8f64e1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.command.schema
 
-import org.apache.hadoop.fs.Path
-import org.apache.spark.sql._
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCommand}
@@ -37,7 +35,7 @@ import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.events.{AlterTableRenamePostEvent, AlterTableRenamePreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.format.SchemaEvolutionEntry
-import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.exception.{ConcurrentOperationException, MalformedCarbonCommandException}
 
 private[sql] case class CarbonAlterTableRenameCommand(
     alterTableRenameModel: AlterTableRenameModel)
@@ -70,7 +68,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
     if (relation == null) {
       LOGGER.audit(s"Rename table request has failed. " +
                    s"Table $oldDatabaseName.$oldTableName does not exist")
-      sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
+      throwMetadataException(oldDatabaseName, oldTableName, "Table does not exist")
     }
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
       LockUsage.COMPACTION_LOCK,
@@ -90,9 +88,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
         .asInstanceOf[CarbonRelation].carbonTable
       carbonTableLockFilePath = carbonTable.getTablePath
       // if any load is in progress for table, do not allow rename table
-      if (SegmentStatusManager.checkIfAnyLoadInProgressForTable(carbonTable)) {
-        throw new AnalysisException(s"Data loading is in progress for table $oldTableName, alter " +
-                                    s"table rename operation is not allowed")
+      if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
+        throw new ConcurrentOperationException(carbonTable, "loading", "alter table rename")
       }
       // invalid data map for the old table, see CARBON-1690
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
@@ -160,6 +157,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       LOGGER.audit(s"Table $oldTableName has been successfully renamed to $newTableName")
       LOGGER.info(s"Table $oldTableName has been successfully renamed to $newTableName")
     } catch {
+      case e: ConcurrentOperationException =>
+        throw e
       case e: Exception =>
         LOGGER.error(e, "Rename table failed: " + e.getMessage)
         if (carbonTable != null) {
@@ -172,7 +171,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
               sparkSession)
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)
         }
-        sys.error(s"Alter table rename table operation failed: ${e.getMessage}")
+        throwMetadataException(oldDatabaseName, oldTableName,
+          s"Alter table rename table operation failed: ${e.getMessage}")
     } finally {
       // case specific to rename table as after table rename old table path will not be found
       if (carbonTable != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/55bffbe2/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
index 5be5b2c..2490f9e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableUnsetCommand.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.sql.execution.command.schema
 
-import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.{Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.hive.CarbonSessionCatalog
 import org.apache.spark.util.AlterTableUtil
 
-import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
-import org.apache.carbondata.format.TableInfo
 
 private[sql] case class CarbonAlterTableUnsetCommand(
     tableIdentifier: TableIdentifier,