You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by gv...@apache.org on 2018/04/22 04:15:35 UTC

[1/2] carbondata git commit: [CARBONDATA-2360][Non Transactional Table] Insert into Non-Transactional Table

Repository: carbondata
Updated Branches:
  refs/heads/master b86ff926d -> b7b8073d6


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 3f86ca4..29d91d6 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -153,9 +153,7 @@ case class CarbonLoadDataCommand(
     } else {
       null
     }
-    if (table.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
-    }
+
     // get the value of 'spark.executor.cores' from spark conf, default value is 1
     val sparkExecutorCores = sparkSession.sparkContext.conf.get("spark.executor.cores", "1")
     // get the value of 'carbon.number.of.cores.while.loading' from carbon properties,
@@ -192,6 +190,7 @@ case class CarbonLoadDataCommand(
         FileUtils.getPaths(factPathFromUser, hadoopConf)
       }
       carbonLoadModel.setFactFilePath(factPath)
+      carbonLoadModel.setCarbonTransactionalTable(table.getTableInfo.isTransactionalTable)
       carbonLoadModel.setAggLoadRequest(
         internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean)
       carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", ""))
@@ -267,10 +266,12 @@ case class CarbonLoadDataCommand(
           carbonLoadModel.setUseOnePass(false)
         }
         // Create table and metadata folders if not exist
-        val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
-        val fileType = FileFactory.getFileType(metadataDirectoryPath)
-        if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
-          FileFactory.mkdirs(metadataDirectoryPath, fileType)
+        if (carbonLoadModel.isCarbonTransactionalTable) {
+          val metadataDirectoryPath = CarbonTablePath.getMetadataPath(table.getTablePath)
+          val fileType = FileFactory.getFileType(metadataDirectoryPath)
+          if (!FileFactory.isFileExist(metadataDirectoryPath, fileType)) {
+            FileFactory.mkdirs(metadataDirectoryPath, fileType)
+          }
         }
         val partitionStatus = SegmentStatus.SUCCESS
         val columnar = sparkSession.conf.get("carbon.is.columnar.storage", "true").toBoolean

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index cd5d8f9..0fb05e0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -55,8 +55,8 @@ case class CarbonShowLoadsCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
     CarbonStore.showSegments(
       limit,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 1b087bd..225237b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -44,8 +44,8 @@ private[sql] case class CarbonProjectForDeleteCommand(
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 24ac80c..d8379a7 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -57,8 +57,8 @@ private[sql] case class CarbonProjectForUpdateCommand(
       return Seq.empty
     }
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
     if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
       throw new ConcurrentOperationException(carbonTable, "loading", "data update")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index 59ec71c..807c925 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -123,6 +123,7 @@ case class CarbonAlterTableAddHivePartitionCommand(
             "Schema of index files located in location is not matching with current table schema")
         }
         val loadModel = new CarbonLoadModel
+        loadModel.setCarbonTransactionalTable(true)
         loadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(table))
         // Create new entry in tablestatus file
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 756bc97..25c0559 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -138,6 +138,7 @@ case class CarbonAlterTableDropPartitionCommand(
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(table.getTablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 929de0a..f4b6de0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -149,6 +149,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(table.getTableName)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
+      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setTablePath(tablePath)
       val loadStartTime = CarbonUpdateUtil.readCurrentTime
       carbonLoadModel.setFactTimeStamp(loadStartTime)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index af52d6b..dfcd12b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -77,8 +77,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
     var oldCarbonTable: CarbonTable = null
     oldCarbonTable = metastore.lookupRelation(Some(oldDatabaseName), oldTableName)(sparkSession)
       .asInstanceOf[CarbonRelation].carbonTable
-    if (oldCarbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!oldCarbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 6266c53..16e99b5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -90,7 +90,7 @@ case class CarbonCreateTableCommand(
       OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
       val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
       val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
-      val isUnmanaged = tableInfo.isUnManagedTable
+      val isTransactionalTable = tableInfo.isTransactionalTable
       if (createDSTable) {
         try {
           val tablePath = tableIdentifier.getTablePath
@@ -133,7 +133,7 @@ case class CarbonCreateTableCommand(
                  |  tablePath "$tablePath",
                  |  path "$tablePath",
                  |  isExternal "$isExternal",
-                 |  isUnManaged "$isUnmanaged",
+                 |  isTransactional "$isTransactionalTable",
                  |  isVisible "$isVisible"
                  |  $carbonSchemaString)
                  |  $partitionString

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 53e1ed4..61df9b1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -50,15 +50,22 @@ case class CarbonDropTableCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+
+
     val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
     val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
+      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+      val locksToBeAcquired: List[String] = if (carbonTable.isTransactionalTable) {
+        List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
+      } else {
+        List.empty
+      }
       locksToBeAcquired foreach {
         lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
       }
-      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+
       if (SegmentStatusManager.isLoadInProgressInTable(carbonTable)) {
         throw new ConcurrentOperationException(carbonTable, "loading", "drop table")
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index c58d02d..26d5330 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -115,8 +115,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
-          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(dataTypeChange) :: Nil
           }
@@ -133,8 +134,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
-          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(addColumn) :: Nil
           }
@@ -151,8 +153,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
           if (carbonTable != null && carbonTable.isFileLevelFormat) {
             throw new MalformedCarbonCommandException(
               "Unsupported alter operation on Carbon external fileformat table")
-          } else if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          } else if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           } else {
             ExecutedCommandExec(dropColumn) :: Nil
           }
@@ -185,8 +188,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
             .lookupRelation(t)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-          if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-            throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+          if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
+            throw new MalformedCarbonCommandException(
+              "Unsupported operation on non transactional table")
           }
           if (!carbonTable.isHivePartitionTable) {
             ExecutedCommandExec(CarbonShowCarbonPartitionsCommand(t)) :: Nil
@@ -238,8 +242,9 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         // if the table has 'preaggregate' DataMap, it doesn't support streaming now
         val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
           .lookupRelation(tableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-        if (carbonTable != null && carbonTable.getTableInfo.isUnManagedTable) {
-          throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+        if (carbonTable != null && !carbonTable.getTableInfo.isTransactionalTable) {
+          throw new MalformedCarbonCommandException(
+            "Unsupported operation on non transactional table")
         }
 
         // TODO remove this limitation later

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 36b6d96..c61471a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -104,7 +104,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
         CarbonRelation(database, tableName, CarbonSparkUtil.createSparkMeta(t), t)
       case None =>
         readCarbonSchema(absIdentifier,
-          parameters.getOrElse("isUnManaged", "false").toBoolean) match {
+          !parameters.getOrElse("isTransactional", "true").toBoolean) match {
           case Some(meta) =>
             CarbonRelation(database, tableName,
               CarbonSparkUtil.createSparkMeta(meta), meta)
@@ -230,7 +230,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
         schemaConverter
           .fromExternalToWrapperTableInfo(thriftTableInfo, dbName, tableName, tablePath)
       wrapperTableInfo.getFactTable.getTableProperties.put("_external", "true")
-      wrapperTableInfo.setUnManagedTable(true)
+      wrapperTableInfo.setTransactionalTable(false)
       Some(wrapperTableInfo)
     } else {
       val tableMetadataFile = CarbonTablePath.getSchemaFilePath(tablePath)
@@ -474,7 +474,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
       DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     } else {
-      if (isUnmanagedCarbonTable(absoluteTableIdentifier)) {
+      if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
         removeTableFromMetadata(dbName, tableName)
         CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
         // discard cached table info in cachedDataSourceTables
@@ -486,9 +486,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
   }
 
 
-  def isUnmanagedCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = {
-    val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName);
-    table.map(_.getTableInfo.isUnManagedTable).getOrElse(false)
+  def isTransactionalCarbonTable(identifier: AbsoluteTableIdentifier): Boolean = {
+    val table = getTableFromMetadataCache(identifier.getDatabaseName, identifier.getTableName)
+    table.map(_.getTableInfo.isTransactionalTable).getOrElse(true)
   }
 
   private def getTimestampFileAndType() = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 33eac61..aacbdd0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -258,7 +258,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
     }
     // validate tblProperties
     val bucketFields = parser.getBucketFields(tableProperties, fields, options)
-    var unManagedTable : Boolean = false
+    var isTransactionalTable : Boolean = true
 
     val tableInfo = if (external) {
       // read table info from schema file in the provided table path
@@ -272,7 +272,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
           if (provider.equalsIgnoreCase("'carbonfile'")) {
             SchemaReader.inferSchema(identifier, true)
           } else {
-            unManagedTable = true
+            isTransactionalTable = false
             SchemaReader.inferSchema(identifier, false)
           }
         }
@@ -307,7 +307,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf,
         tableComment)
       TableNewProcessor(tableModel)
     }
-    tableInfo.setUnManagedTable(unManagedTable)
+    tableInfo.setTransactionalTable(isTransactionalTable)
     selectQuery match {
       case query@Some(q) =>
         CarbonCreateTableAsSelectCommand(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index 43d8c03..d98229a 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -158,6 +158,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTableName(table.getTableName)
+    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
     carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
index 829c17e..ad1c84c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java
@@ -110,7 +110,7 @@ public class CarbonDataLoadConfiguration {
 
   private SortColumnRangeInfo sortColumnRangeInfo;
 
-  private boolean carbonUnmanagedTable;
+  private boolean carbonTransactionalTable;
 
   /**
    * Flder path to where data should be written for this load.
@@ -380,11 +380,11 @@ public class CarbonDataLoadConfiguration {
     this.sortColumnRangeInfo = sortColumnRangeInfo;
   }
 
-  public boolean isCarbonUnmanagedTable() {
-    return carbonUnmanagedTable;
+  public boolean isCarbonTransactionalTable() {
+    return carbonTransactionalTable;
   }
 
-  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
-    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
+    this.carbonTransactionalTable = carbonTransactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index 6d3f596..9c1d113 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -185,7 +185,7 @@ public final class DataLoadProcessBuilder {
     CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable();
     AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
     configuration.setTableIdentifier(identifier);
-    configuration.setCarbonUnmanagedTable(loadModel.isCarbonUnmanagedTable());
+    configuration.setCarbonTransactionalTable(loadModel.isCarbonTransactionalTable());
     configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime());
     configuration.setHeader(loadModel.getCsvHeaderColumns());
     configuration.setSegmentId(loadModel.getSegmentId());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index fd39563..2b820d8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -48,11 +48,11 @@ public class CarbonLoadModel implements Serializable {
   private String tablePath;
 
   /*
-     This points if the carbonTable is a Unmanaged Table or not.
+     This points if the carbonTable is a Non Transactional Table or not.
      The path will be pointed by the tablePath. And there will be
-     no Metadata folder present for the unmanaged Table.
+     no Metadata folder present for the Non Transactional Table.
    */
-  private boolean carbonUnmanagedTable;
+  private boolean carbonTransactionalTable = true;
 
   private String csvHeader;
   private String[] csvHeaderColumns;
@@ -417,7 +417,7 @@ public class CarbonLoadModel implements Serializable {
     copy.defaultTimestampFormat = defaultTimestampFormat;
     copy.maxColumns = maxColumns;
     copy.tablePath = tablePath;
-    copy.carbonUnmanagedTable = carbonUnmanagedTable;
+    copy.carbonTransactionalTable = carbonTransactionalTable;
     copy.useOnePass = useOnePass;
     copy.dictionaryServerHost = dictionaryServerHost;
     copy.dictionaryServerPort = dictionaryServerPort;
@@ -471,7 +471,7 @@ public class CarbonLoadModel implements Serializable {
     copyObj.defaultTimestampFormat = defaultTimestampFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.tablePath = tablePath;
-    copyObj.carbonUnmanagedTable = carbonUnmanagedTable;
+    copyObj.carbonTransactionalTable = carbonTransactionalTable;
     copyObj.useOnePass = useOnePass;
     copyObj.dictionaryServerHost = dictionaryServerHost;
     copyObj.dictionaryServerPort = dictionaryServerPort;
@@ -835,11 +835,11 @@ public class CarbonLoadModel implements Serializable {
     setLoadMetadataDetails(Arrays.asList(details));
   }
 
-  public boolean isCarbonUnmanagedTable() {
-    return carbonUnmanagedTable;
+  public boolean isCarbonTransactionalTable() {
+    return carbonTransactionalTable;
   }
 
-  public void setCarbonUnmanagedTable(boolean carbonUnmanagedTable) {
-    this.carbonUnmanagedTable = carbonUnmanagedTable;
+  public void setCarbonTransactionalTable(boolean carbonTransactionalTable) {
+    this.carbonTransactionalTable = carbonTransactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
index 8eb5ed1..3385479 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java
@@ -57,10 +57,11 @@ public class CarbonLoadModelBuilder {
   /**
    * build CarbonLoadModel for data loading
    * @param options Load options from user input
+   * @param taskNo
    * @return a new CarbonLoadModel instance
    */
-  public CarbonLoadModel build(
-      Map<String, String> options, long UUID) throws InvalidLoadOptionException, IOException {
+  public CarbonLoadModel build(Map<String, String> options, long UUID, String taskNo)
+      throws InvalidLoadOptionException, IOException {
     Map<String, String> optionsFinal = LoadOption.fillOptionWithDefaultValue(options);
 
     if (!options.containsKey("fileheader")) {
@@ -72,8 +73,9 @@ public class CarbonLoadModelBuilder {
       optionsFinal.put("fileheader", Strings.mkString(columns, ","));
     }
     CarbonLoadModel model = new CarbonLoadModel();
-    model.setCarbonUnmanagedTable(table.isUnManagedTable());
+    model.setCarbonTransactionalTable(table.isTransactionalTable());
     model.setFactTimeStamp(UUID);
+    model.setTaskNo(taskNo);
 
     // we have provided 'fileheader', so it hadoopConf can be null
     build(options, optionsFinal, model, null);
@@ -129,6 +131,7 @@ public class CarbonLoadModelBuilder {
     carbonLoadModel.setDatabaseName(table.getDatabaseName());
     carbonLoadModel.setTablePath(table.getTablePath());
     carbonLoadModel.setTableName(table.getTableName());
+    carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable());
     CarbonDataLoadSchema dataLoadSchema = new CarbonDataLoadSchema(table);
     // Need to fill dimension relation
     carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
index 089b8c7..4ff1cce 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java
@@ -244,7 +244,7 @@ public class LoadOption {
       }
     }
 
-    if (!carbonLoadModel.isCarbonUnmanagedTable() && !CarbonDataProcessorUtil
+    if (carbonLoadModel.isCarbonTransactionalTable() && !CarbonDataProcessorUtil
         .isHeaderValid(carbonLoadModel.getTableName(), csvColumns,
             carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) {
       if (csvFile == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index 0625a66..aaf20c7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -367,7 +367,7 @@ public class CarbonFactDataHandlerModel {
     }
     AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier();
     String carbonDataDirectoryPath;
-    if (configuration.isCarbonUnmanagedTable()) {
+    if (!configuration.isCarbonTransactionalTable()) {
       carbonDataDirectoryPath = absoluteTableIdentifier.getTablePath();
     } else {
       carbonDataDirectoryPath = CarbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 2b4748f..2b295d6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -159,6 +159,30 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * This API deletes the content of the non Transactional Tables when insert overwrite is set true.
+   *
+   * @param loadModel
+   * @throws IOException
+   */
+  public static void deleteNonTransactionalTableForInsertOverwrite(final CarbonLoadModel loadModel)
+      throws IOException {
+    // We need to delete the content of the Table Path Folder except the
+    // Newly added file.
+    List<String> filesToBeDeleted = new ArrayList<>();
+    CarbonFile carbonFile = FileFactory.getCarbonFile(loadModel.getTablePath());
+    CarbonFile[] filteredList = carbonFile.listFiles(new CarbonFileFilter() {
+      @Override public boolean accept(CarbonFile file) {
+        return !file.getName().contains(loadModel.getFactTimeStamp() + "");
+      }
+    });
+    for (CarbonFile file : filteredList) {
+      filesToBeDeleted.add(file.getAbsolutePath());
+    }
+
+    deleteFiles(filesToBeDeleted);
+  }
+
+  /**
    * This API will write the load level metadata for the loadmanagement module inorder to
    * manage the load and query execution management smoothly.
    *
@@ -169,8 +193,13 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
-      CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
+      final CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite, String uuid)
       throws IOException {
+    // For Non Transactional tables no need to update the the Table Status file.
+    if (!loadModel.isCarbonTransactionalTable()) {
+      return true;
+    }
+
     return recordNewLoadMetadata(newMetaEntry, loadModel, loadStartEntry, insertOverwrite, uuid,
         new ArrayList<Segment>(), new ArrayList<Segment>());
   }
@@ -191,10 +220,12 @@ public final class CarbonLoaderUtil {
     boolean status = false;
     AbsoluteTableIdentifier identifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
-    FileType fileType = FileFactory.getFileType(metadataPath);
-    if (!FileFactory.isFileExist(metadataPath, fileType)) {
-      FileFactory.mkdirs(metadataPath, fileType);
+    if (loadModel.isCarbonTransactionalTable()) {
+      String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath());
+      FileType fileType = FileFactory.getFileType(metadataPath);
+      if (!FileFactory.isFileExist(metadataPath, fileType)) {
+        FileFactory.mkdirs(metadataPath, fileType);
+      }
     }
     String tableStatusPath;
     if (loadModel.getCarbonDataLoadSchema().getCarbonTable().isChildDataMap() && !uuid.isEmpty()) {
@@ -432,6 +463,10 @@ public final class CarbonLoaderUtil {
     }
     CarbonLoaderUtil
         .populateNewLoadMetaEntry(newLoadMetaEntry, status, model.getFactTimeStamp(), false);
+
+    if (!model.isCarbonTransactionalTable() && insertOverwrite) {
+      CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(model);
+    }
     boolean entryAdded = CarbonLoaderUtil
         .recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite, uuid);
     if (!entryAdded) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index aae6f03..16d4d53 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -136,6 +136,7 @@ public class StoreCreator {
       loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName());
       loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
       loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName());
+      loadModel.setCarbonTransactionalTable(true);
       loadModel.setFactFilePath(factFilePath);
       loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
       loadModel.setTablePath(identifier.getTablePath());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index 4e09553..de1e5be 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -55,8 +55,9 @@ public class CarbonWriterBuilder {
   private boolean persistSchemaFile;
   private int blockletSize;
   private int blockSize;
-  private boolean isUnManagedTable;
+  private boolean isTransactionalTable;
   private long UUID;
+  private String taskNo;
 
   /**
    * prepares the builder with the schema provided
@@ -91,6 +92,19 @@ public class CarbonWriterBuilder {
   }
 
   /**
+   * sets the taskNo for the writer. SDKs concurrently running
+   * will set taskNo in order to avoid conflits in file write.
+   * @param taskNo is the TaskNo user wants to specify. Mostly it system time.
+   * @return updated CarbonWriterBuilder
+   */
+  public CarbonWriterBuilder taskNo(String taskNo) {
+    this.taskNo = taskNo;
+    return this;
+  }
+
+
+
+  /**
    * If set, create a schema file in metadata folder.
    * @param persist is a boolean value, If set, create a schema file in metadata folder
    * @return updated CarbonWriterBuilder
@@ -101,14 +115,14 @@ public class CarbonWriterBuilder {
   }
 
   /**
-   * If set true, writes the carbondata and carbonindex files in a flat folder structure
-   * @param isUnManagedTable is a boolelan value if set writes
+   * If set false, writes the carbondata and carbonindex files in a flat folder structure
+   * @param isTransactionalTable is a boolelan value if set to false then writes
    *                     the carbondata and carbonindex files in a flat folder structure
    * @return updated CarbonWriterBuilder
    */
-  public CarbonWriterBuilder unManagedTable(boolean isUnManagedTable) {
-    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
-    this.isUnManagedTable = isUnManagedTable;
+  public CarbonWriterBuilder isTransactionalTable(boolean isTransactionalTable) {
+    Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");
+    this.isTransactionalTable = isTransactionalTable;
     return this;
   }
 
@@ -180,7 +194,7 @@ public class CarbonWriterBuilder {
     }
 
     // build LoadModel
-    return buildLoadModel(table, UUID);
+    return buildLoadModel(table, UUID, taskNo);
   }
 
   /**
@@ -209,7 +223,7 @@ public class CarbonWriterBuilder {
     }
     String tableName;
     String dbName;
-    if (!isUnManagedTable) {
+    if (isTransactionalTable) {
       tableName = "_tempTable";
       dbName = "_tempDB";
     } else {
@@ -223,7 +237,7 @@ public class CarbonWriterBuilder {
         .databaseName(dbName)
         .tablePath(path)
         .tableSchema(schema)
-        .isUnManagedTable(isUnManagedTable)
+        .isTransactionalTable(isTransactionalTable)
         .build();
     return table;
   }
@@ -261,13 +275,13 @@ public class CarbonWriterBuilder {
   /**
    * Build a {@link CarbonLoadModel}
    */
-  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID)
+  private CarbonLoadModel buildLoadModel(CarbonTable table, long UUID, String taskNo)
       throws InvalidLoadOptionException, IOException {
     Map<String, String> options = new HashMap<>();
     if (sortColumns != null) {
       options.put("sort_columns", Strings.mkString(sortColumns, ","));
     }
     CarbonLoadModelBuilder builder = new CarbonLoadModelBuilder(table);
-    return builder.build(options, UUID);
+    return builder.build(options, UUID, taskNo);
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
index 25c34e0..c30bd3a 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/AvroCarbonWriterTest.java
@@ -66,6 +66,7 @@ public class AvroCarbonWriterTest {
       CarbonWriter writer = CarbonWriter.builder()
           .withSchema(new org.apache.carbondata.sdk.file.Schema(fields))
           .outputPath(path)
+          .isTransactionalTable(true)
           .buildWriterForAvroInput();
 
       for (int i = 0; i < 100; i++) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
index eecbf5f..41fde66 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVCarbonWriterTest.java
@@ -90,6 +90,7 @@ public class CSVCarbonWriterTest {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(new Schema(fields))
+          .isTransactionalTable(true)
           .outputPath(path);
 
       CarbonWriter writer = builder.buildWriterForCSVInput();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
new file mode 100644
index 0000000..32fe6e8
--- /dev/null
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVNonTransactionalCarbonWriterTest.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.sdk.file;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test suite for {@link CSVCarbonWriter}
+ */
+public class CSVNonTransactionalCarbonWriterTest {
+
+  @Test
+  public void testWriteFiles() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testWriteFilesJsonSchema() throws IOException {
+    String path = "./testWriteFilesJsonSchema";
+    FileUtils.deleteDirectory(new File(path));
+
+    String schema = new StringBuilder()
+        .append("[ \n")
+        .append("   {\"name\":\"string\"},\n")
+        .append("   {\"age\":\"int\"},\n")
+        .append("   {\"height\":\"double\"}\n")
+        .append("]")
+        .toString();
+
+    writeFilesAndVerify(Schema.parseJson(schema), path);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path) {
+    writeFilesAndVerify(schema, path, null);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
+    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
+  }
+
+  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
+    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
+  }
+
+  /**
+   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
+   * @param rows number of rows to write
+   * @param schema schema of the file
+   * @param path local write path
+   * @param sortColumns sort columns
+   * @param persistSchema true if want to persist schema file
+   * @param blockletSize blockletSize in the file, -1 for default size
+   * @param blockSize blockSize in the file, -1 for default size
+   */
+  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
+      boolean persistSchema, int blockletSize, int blockSize) {
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(schema)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.currentTimeMillis())
+          .taskNo(Long.toString(System.nanoTime()))
+          .outputPath(path);
+      if (sortColumns != null) {
+        builder = builder.sortBy(sortColumns);
+      }
+      if (persistSchema) {
+        builder = builder.persistSchemaFile(true);
+      }
+      if (blockletSize != -1) {
+        builder = builder.withBlockletSize(blockletSize);
+      }
+      if (blockSize != -1) {
+        builder = builder.withBlockSize(blockSize);
+      }
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < rows; i++) {
+        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
+      }
+      writer.close();
+    } catch (IOException e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    } catch (InvalidLoadOptionException l) {
+      l.printStackTrace();
+      Assert.fail(l.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+  }
+  
+
+  @Test
+  public void testAllPrimitiveDataType() throws IOException {
+    // TODO: write all data type and read by CarbonRecordReader to verify the content
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[9];
+    fields[0] = new Field("stringField", DataTypes.STRING);
+    fields[1] = new Field("intField", DataTypes.INT);
+    fields[2] = new Field("shortField", DataTypes.SHORT);
+    fields[3] = new Field("longField", DataTypes.LONG);
+    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
+    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
+    fields[6] = new Field("dateField", DataTypes.DATE);
+    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
+    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
+
+    try {
+      CarbonWriterBuilder builder = CarbonWriter.builder()
+          .withSchema(new Schema(fields))
+          .uniqueIdentifier(System.currentTimeMillis())
+          .isTransactionalTable(false)
+          .taskNo(Long.toString(System.nanoTime()))
+          .outputPath(path);
+
+      CarbonWriter writer = builder.buildWriterForCSVInput();
+
+      for (int i = 0; i < 100; i++) {
+        String[] row = new String[]{
+            "robot" + (i % 10),
+            String.valueOf(i),
+            String.valueOf(i),
+            String.valueOf(Long.MAX_VALUE - i),
+            String.valueOf((double) i / 2),
+            String.valueOf(true),
+            "2019-03-02",
+            "2019-02-12 03:03:34"
+        };
+        writer.write(row);
+      }
+      writer.close();
+    } catch (Exception e) {
+      e.printStackTrace();
+      Assert.fail(e.getMessage());
+    }
+
+    File segmentFolder = new File(path);
+    Assert.assertTrue(segmentFolder.exists());
+
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertTrue(dataFiles.length > 0);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Blocklet() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
+
+    // TODO: implement reader to verify the number of blocklet in the file
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void test2Block() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
+
+    File segmentFolder = new File(path);
+    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
+      @Override public boolean accept(File pathname) {
+        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
+      }
+    });
+    Assert.assertNotNull(dataFiles);
+    Assert.assertEquals(2, dataFiles.length);
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testSortColumns() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
+
+    // TODO: implement reader and verify the data is sorted
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+  @Test
+  public void testPartitionOutput() {
+    // TODO: test write data with partition
+  }
+
+  @Test
+  public void testSchemaPersistence() throws IOException {
+    String path = "./testWriteFiles";
+    FileUtils.deleteDirectory(new File(path));
+
+    Field[] fields = new Field[2];
+    fields[0] = new Field("name", DataTypes.STRING);
+    fields[1] = new Field("age", DataTypes.INT);
+
+    writeFilesAndVerify(new Schema(fields), path, true);
+
+    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
+    Assert.assertTrue(new File(schemaFile).exists());
+
+    FileUtils.deleteDirectory(new File(path));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
deleted file mode 100644
index 4bcdfff..0000000
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/CSVUnManagedCarbonWriterTest.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.sdk.file;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FilenameFilter;
-import java.io.IOException;
-
-import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test suite for {@link CSVCarbonWriter}
- */
-public class CSVUnManagedCarbonWriterTest {
-
-  @Test
-  public void testWriteFiles() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testWriteFilesJsonSchema() throws IOException {
-    String path = "./testWriteFilesJsonSchema";
-    FileUtils.deleteDirectory(new File(path));
-
-    String schema = new StringBuilder()
-        .append("[ \n")
-        .append("   {\"name\":\"string\"},\n")
-        .append("   {\"age\":\"int\"},\n")
-        .append("   {\"height\":\"double\"}\n")
-        .append("]")
-        .toString();
-
-    writeFilesAndVerify(Schema.parseJson(schema), path);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path) {
-    writeFilesAndVerify(schema, path, null);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, String[] sortColumns) {
-    writeFilesAndVerify(100, schema, path, sortColumns, false, -1, -1);
-  }
-
-  private void writeFilesAndVerify(Schema schema, String path, boolean persistSchema) {
-    writeFilesAndVerify(100, schema, path, null, persistSchema, -1, -1);
-  }
-
-  /**
-   * Invoke CarbonWriter API to write carbon files and assert the file is rewritten
-   * @param rows number of rows to write
-   * @param schema schema of the file
-   * @param path local write path
-   * @param sortColumns sort columns
-   * @param persistSchema true if want to persist schema file
-   * @param blockletSize blockletSize in the file, -1 for default size
-   * @param blockSize blockSize in the file, -1 for default size
-   */
-  private void writeFilesAndVerify(int rows, Schema schema, String path, String[] sortColumns,
-      boolean persistSchema, int blockletSize, int blockSize) {
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(schema)
-          .unManagedTable(true)
-          .uniqueIdentifier(System.currentTimeMillis())
-          .outputPath(path);
-      if (sortColumns != null) {
-        builder = builder.sortBy(sortColumns);
-      }
-      if (persistSchema) {
-        builder = builder.persistSchemaFile(true);
-      }
-      if (blockletSize != -1) {
-        builder = builder.withBlockletSize(blockletSize);
-      }
-      if (blockSize != -1) {
-        builder = builder.withBlockSize(blockSize);
-      }
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < rows; i++) {
-        writer.write(new String[]{"robot" + (i % 10), String.valueOf(i), String.valueOf((double) i / 2)});
-      }
-      writer.close();
-    } catch (IOException e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    } catch (InvalidLoadOptionException l) {
-      l.printStackTrace();
-      Assert.fail(l.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-  }
-  
-
-  @Test
-  public void testAllPrimitiveDataType() throws IOException {
-    // TODO: write all data type and read by CarbonRecordReader to verify the content
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[9];
-    fields[0] = new Field("stringField", DataTypes.STRING);
-    fields[1] = new Field("intField", DataTypes.INT);
-    fields[2] = new Field("shortField", DataTypes.SHORT);
-    fields[3] = new Field("longField", DataTypes.LONG);
-    fields[4] = new Field("doubleField", DataTypes.DOUBLE);
-    fields[5] = new Field("boolField", DataTypes.BOOLEAN);
-    fields[6] = new Field("dateField", DataTypes.DATE);
-    fields[7] = new Field("timeField", DataTypes.TIMESTAMP);
-    fields[8] = new Field("decimalField", DataTypes.createDecimalType(8, 2));
-
-    try {
-      CarbonWriterBuilder builder = CarbonWriter.builder()
-          .withSchema(new Schema(fields))
-          .uniqueIdentifier(System.currentTimeMillis())
-          .unManagedTable(true)
-          .outputPath(path);
-
-      CarbonWriter writer = builder.buildWriterForCSVInput();
-
-      for (int i = 0; i < 100; i++) {
-        String[] row = new String[]{
-            "robot" + (i % 10),
-            String.valueOf(i),
-            String.valueOf(i),
-            String.valueOf(Long.MAX_VALUE - i),
-            String.valueOf((double) i / 2),
-            String.valueOf(true),
-            "2019-03-02",
-            "2019-02-12 03:03:34"
-        };
-        writer.write(row);
-      }
-      writer.close();
-    } catch (Exception e) {
-      e.printStackTrace();
-      Assert.fail(e.getMessage());
-    }
-
-    File segmentFolder = new File(path);
-    Assert.assertTrue(segmentFolder.exists());
-
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertTrue(dataFiles.length > 0);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Blocklet() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 1, 100);
-
-    // TODO: implement reader to verify the number of blocklet in the file
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void test2Block() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(1000 * 1000, new Schema(fields), path, null, false, 2, 2);
-
-    File segmentFolder = new File(path);
-    File[] dataFiles = segmentFolder.listFiles(new FileFilter() {
-      @Override public boolean accept(File pathname) {
-        return pathname.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT);
-      }
-    });
-    Assert.assertNotNull(dataFiles);
-    Assert.assertEquals(2, dataFiles.length);
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testSortColumns() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, new String[]{"name"});
-
-    // TODO: implement reader and verify the data is sorted
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-  @Test
-  public void testPartitionOutput() {
-    // TODO: test write data with partition
-  }
-
-  @Test
-  public void testSchemaPersistence() throws IOException {
-    String path = "./testWriteFiles";
-    FileUtils.deleteDirectory(new File(path));
-
-    Field[] fields = new Field[2];
-    fields[0] = new Field("name", DataTypes.STRING);
-    fields[1] = new Field("age", DataTypes.INT);
-
-    writeFilesAndVerify(new Schema(fields), path, true);
-
-    String schemaFile = CarbonTablePath.getSchemaFilePath(path);
-    Assert.assertTrue(new File(schemaFile).exists());
-
-    FileUtils.deleteDirectory(new File(path));
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
----------------------------------------------------------------------
diff --git a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
index 068164d..03aecb8 100644
--- a/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
+++ b/store/sdk/src/test/java/org/apache/carbondata/sdk/file/TestUtil.java
@@ -56,6 +56,7 @@ public class TestUtil {
     try {
       CarbonWriterBuilder builder = CarbonWriter.builder()
           .withSchema(schema)
+          .isTransactionalTable(true)
           .outputPath(path);
       if (sortColumns != null) {
         builder = builder.sortBy(sortColumns);


[2/2] carbondata git commit: [CARBONDATA-2360][Non Transactional Table] Insert into Non-Transactional Table

Posted by gv...@apache.org.
[CARBONDATA-2360][Non Transactional Table] Insert into Non-Transactional Table

Also supports overwrite clause

This closes #2177


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b7b8073d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b7b8073d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b7b8073d

Branch: refs/heads/master
Commit: b7b8073d6ae729a4f9b73376ad5fc111c66efe3d
Parents: b86ff92
Author: sounakr <so...@gmail.com>
Authored: Tue Apr 17 13:38:51 2018 +0530
Committer: Venkata Ramana G <ra...@huawei.com>
Committed: Sun Apr 22 09:43:02 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/schema/SchemaReader.java      |   1 +
 .../core/metadata/schema/table/CarbonTable.java |  22 +-
 .../schema/table/CarbonTableBuilder.java        |  13 +-
 .../core/metadata/schema/table/TableInfo.java   |  26 +-
 .../LatestFilesReadCommittedScope.java          |   2 +-
 .../executor/impl/AbstractQueryExecutor.java    |   6 +-
 .../scan/executor/util/RestructureUtil.java     |  30 +-
 .../SegmentUpdateStatusManager.java             |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |   5 +-
 .../hadoop/api/CarbonInputFormat.java           |   8 +-
 .../hadoop/api/CarbonOutputCommitter.java       |  10 +-
 .../hadoop/api/CarbonTableInputFormat.java      |  10 +-
 .../hadoop/api/CarbonTableOutputFormat.java     |   3 +
 .../hadoop/testutil/StoreCreator.java           |   1 +
 .../hadoop/util/CarbonInputFormatUtil.java      |   3 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   1 +
 .../src/test/resources/nontransactional.csv     |   3 +
 ...FileInputFormatWithExternalCarbonTable.scala |   2 +-
 ...tCreateTableUsingSparkCarbonFileFormat.scala |   2 +-
 .../TestNonTransactionalCarbonTable.scala       | 516 +++++++++++++++++++
 ...tSparkCarbonFileFormatWithSparkSession.scala |   2 +-
 .../createTable/TestUnmanagedCarbonTable.scala  | 408 ---------------
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   2 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  24 +-
 .../org/apache/spark/sql/CarbonCountStar.scala  |   3 +-
 .../org/apache/spark/sql/CarbonSource.scala     |  14 +-
 .../datamap/CarbonCreateDataMapCommand.scala    |   4 +-
 .../CarbonAlterTableCompactionCommand.scala     |   5 +-
 .../CarbonDeleteLoadByIdCommand.scala           |   4 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   4 +-
 .../management/CarbonLoadDataCommand.scala      |  15 +-
 .../management/CarbonShowLoadsCommand.scala     |   4 +-
 .../CarbonProjectForDeleteCommand.scala         |   4 +-
 .../CarbonProjectForUpdateCommand.scala         |   4 +-
 ...arbonAlterTableAddHivePartitionCommand.scala |   1 +
 .../CarbonAlterTableDropPartitionCommand.scala  |   1 +
 .../CarbonAlterTableSplitPartitionCommand.scala |   1 +
 .../schema/CarbonAlterTableRenameCommand.scala  |   4 +-
 .../table/CarbonCreateTableCommand.scala        |   4 +-
 .../command/table/CarbonDropTableCommand.scala  |  11 +-
 .../sql/execution/strategy/DDLStrategy.scala    |  25 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    |  12 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |   6 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   1 +
 .../loading/CarbonDataLoadConfiguration.java    |  10 +-
 .../loading/DataLoadProcessBuilder.java         |   2 +-
 .../loading/model/CarbonLoadModel.java          |  18 +-
 .../loading/model/CarbonLoadModelBuilder.java   |   9 +-
 .../processing/loading/model/LoadOption.java    |   2 +-
 .../store/CarbonFactDataHandlerModel.java       |   2 +-
 .../processing/util/CarbonLoaderUtil.java       |  45 +-
 .../carbondata/processing/StoreCreator.java     |   1 +
 .../sdk/file/CarbonWriterBuilder.java           |  36 +-
 .../sdk/file/AvroCarbonWriterTest.java          |   1 +
 .../sdk/file/CSVCarbonWriterTest.java           |   1 +
 .../CSVNonTransactionalCarbonWriterTest.java    | 278 ++++++++++
 .../sdk/file/CSVUnManagedCarbonWriterTest.java  | 277 ----------
 .../apache/carbondata/sdk/file/TestUtil.java    |   1 +
 58 files changed, 1066 insertions(+), 846 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
index 8692f13..be3906b 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/SchemaReader.java
@@ -91,6 +91,7 @@ public class SchemaReader {
     TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
         tableInfo, identifier.getDatabaseName(), identifier.getTableName(),
         identifier.getTablePath());
+    wrapperTableInfo.setTransactionalTable(false);
     return wrapperTableInfo;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 88e00f3..f0ab857 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -142,12 +142,16 @@ public class CarbonTable implements Serializable {
   private boolean hasDataMapSchema;
 
   /**
-   * The boolean field which points if the data written for UnManaged Table
-   * or Managed Table. The difference between managed and unManaged table is
-   * unManaged Table will not contain any Metadata folder and subsequently
+   * The boolean field which points if the data written for Non Transactional Table
+   * or Transactional Table.
+   * transactional table means carbon will provide transactional support when user doing data
+   * management like data loading, whether it is success or failure, data will be in consistent
+   * state
+   * The difference between Transactional and non Transactional table is
+   * non Transactional Table will not contain any Metadata folder and subsequently
    * no TableStatus or Schema files.
    */
-  private boolean isUnManagedTable;
+  private boolean isTransactionalTable = true;
 
   private CarbonTable() {
     this.tableDimensionsMap = new HashMap<String, List<CarbonDimension>>();
@@ -247,7 +251,7 @@ public class CarbonTable implements Serializable {
     table.blockSize = tableInfo.getTableBlockSizeInMB();
     table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
     table.tableUniqueName = tableInfo.getTableUniqueName();
-    table.setUnManagedTable(tableInfo.isUnManagedTable());
+    table.setTransactionalTable(tableInfo.isTransactionalTable());
     table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
     table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
     if (tableInfo.getFactTable().getBucketingInfo() != null) {
@@ -939,11 +943,11 @@ public class CarbonTable implements Serializable {
     return new CarbonTableBuilder();
   }
 
-  public boolean isUnManagedTable() {
-    return isUnManagedTable;
+  public boolean isTransactionalTable() {
+    return isTransactionalTable;
   }
 
-  public void setUnManagedTable(boolean unManagedTable) {
-    isUnManagedTable = unManagedTable;
+  public void setTransactionalTable(boolean transactionalTable) {
+    isTransactionalTable = transactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
index 82d0246..e1d2162 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTableBuilder.java
@@ -28,7 +28,7 @@ public class CarbonTableBuilder {
   private String tableName;
   private String databaseName;
   private String tablePath;
-  private boolean unManagedTable;
+  private boolean isTransactionalTable;
   private TableSchema tableSchema;
 
   public CarbonTableBuilder tableName(String tableName) {
@@ -47,10 +47,9 @@ public class CarbonTableBuilder {
     return this;
   }
 
-
-  public CarbonTableBuilder isUnManagedTable(boolean isUnManagedTable) {
-    Objects.requireNonNull(isUnManagedTable, "UnManaged Table should not be null");
-    this.unManagedTable = isUnManagedTable;
+  public CarbonTableBuilder isTransactionalTable(boolean isTransactionalTable) {
+    Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");
+    this.isTransactionalTable = isTransactionalTable;
     return this;
   }
 
@@ -63,7 +62,7 @@ public class CarbonTableBuilder {
   public CarbonTable build() {
     Objects.requireNonNull(tablePath, "tablePath should not be null");
     Objects.requireNonNull(tableSchema, "tableSchema should not be null");
-    Objects.requireNonNull(unManagedTable, "UnManaged Table should not be null");
+    Objects.requireNonNull(isTransactionalTable, "Transactional Table should not be null");
 
 
     TableInfo tableInfo = new TableInfo();
@@ -71,7 +70,7 @@ public class CarbonTableBuilder {
     tableInfo.setTableUniqueName(databaseName + "_" + tableName);
     tableInfo.setFactTable(tableSchema);
     tableInfo.setTablePath(tablePath);
-    tableInfo.setUnManagedTable(unManagedTable);
+    tableInfo.setTransactionalTable(isTransactionalTable);
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setDataMapSchemaList(new ArrayList<DataMapSchema>(0));
     return CarbonTable.buildFromTableInfo(tableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 3e7ea62..c8ac15a 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -78,12 +78,17 @@ public class TableInfo implements Serializable, Writable {
   private String tablePath;
 
   /**
-   * The boolean field which points if the data written for UnManaged Table
-   * or Managed Table. The difference between managed and unManaged table is
-   * unManaged Table will not contain any Metadata folder and subsequently
+   * The boolean field which points if the data written for Non Transactional Table
+   * or Transactional Table. The difference between Transactional and Non Transactional table is
+   * Non Transactional Table will not contain any Metadata folder and subsequently
    * no TableStatus or Schema files.
+   * All ACID properties cannot be aplied to Non Transactional Table as there is no Commit points
+   * i.e. no TableStatus File.
+   * What ever files present in the path will be read but it system doesnot ensure ACID rules for
+   * this data, mostly Consistency part.
+   *
    */
-  private boolean isUnManagedTable;
+  private boolean isTransactionalTable = true;
 
   // this identifier is a lazy field which will be created when it is used first time
   private AbsoluteTableIdentifier identifier;
@@ -94,6 +99,7 @@ public class TableInfo implements Serializable, Writable {
 
   public TableInfo() {
     dataMapSchemaList = new ArrayList<>();
+    isTransactionalTable = true;
   }
 
   /**
@@ -248,7 +254,7 @@ public class TableInfo implements Serializable, Writable {
     factTable.write(out);
     out.writeLong(lastUpdatedTime);
     out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
-    out.writeBoolean(isUnManagedTable);
+    out.writeBoolean(isTransactionalTable);
     boolean isChildSchemaExists =
         null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
@@ -276,7 +282,7 @@ public class TableInfo implements Serializable, Writable {
     this.factTable.readFields(in);
     this.lastUpdatedTime = in.readLong();
     this.tablePath = in.readUTF();
-    this.isUnManagedTable = in.readBoolean();
+    this.isTransactionalTable = in.readBoolean();
     boolean isChildSchemaExists = in.readBoolean();
     this.dataMapSchemaList = new ArrayList<>();
     if (isChildSchemaExists) {
@@ -330,11 +336,11 @@ public class TableInfo implements Serializable, Writable {
     return parentRelationIdentifiers;
   }
 
-  public boolean isUnManagedTable() {
-    return isUnManagedTable;
+  public boolean isTransactionalTable() {
+    return isTransactionalTable;
   }
 
-  public void setUnManagedTable(boolean unManagedTable) {
-    isUnManagedTable = unManagedTable;
+  public void setTransactionalTable(boolean transactionalTable) {
+    isTransactionalTable = transactionalTable;
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
index 6afa280..3f870b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
+++ b/core/src/main/java/org/apache/carbondata/core/readcommitter/LatestFilesReadCommittedScope.java
@@ -33,7 +33,7 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
- * This is a readCommittedScope for unmanaged carbon table
+ * This is a readCommittedScope for non transactional carbon table
  */
 @InterfaceAudience.Internal
 @InterfaceStability.Stable

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index d2d458e..bc410ce 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -309,10 +309,10 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .createDimensionInfoAndGetCurrentBlockQueryDimension(blockExecutionInfo,
             queryModel.getProjectionDimensions(), tableBlockDimensions,
             segmentProperties.getComplexDimensions(), queryModel.getProjectionMeasures().size(),
-            queryModel.getTable().getTableInfo().isUnManagedTable());
+            queryModel.getTable().getTableInfo().isTransactionalTable());
     blockExecutionInfo.setBlockId(
         CarbonUtil.getBlockId(queryModel.getAbsoluteTableIdentifier(), filePath, segmentId,
-            queryModel.getTable().getTableInfo().isUnManagedTable()));
+            queryModel.getTable().getTableInfo().isTransactionalTable()));
     blockExecutionInfo.setDeleteDeltaFilePath(deleteDeltaFiles);
     blockExecutionInfo.setStartBlockletIndex(startBlockletIndex);
     blockExecutionInfo.setNumberOfBlockletToScan(numberOfBlockletToScan);
@@ -520,7 +520,7 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> {
         .createMeasureInfoAndGetCurrentBlockQueryMeasures(executionInfo,
             queryModel.getProjectionMeasures(),
             tableBlock.getSegmentProperties().getMeasures(),
-            queryModel.getTable().getTableInfo().isUnManagedTable());
+            queryModel.getTable().getTableInfo().isTransactionalTable());
     // setting the measure aggregator for all aggregation function selected
     // in query
     executionInfo.getMeasureInfo().setMeasureDataTypes(queryProperties.measureDataTypes);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
index 3b477ab..c69ba6c 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java
@@ -58,13 +58,13 @@ public class RestructureUtil {
    * @param queryDimensions
    * @param tableBlockDimensions
    * @param tableComplexDimension
-   * @param isUnManagedTable
+   * @param isTransactionalTable
    * @return list of query dimension which is present in the table block
    */
   public static List<ProjectionDimension> createDimensionInfoAndGetCurrentBlockQueryDimension(
       BlockExecutionInfo blockExecutionInfo, List<ProjectionDimension> queryDimensions,
       List<CarbonDimension> tableBlockDimensions, List<CarbonDimension> tableComplexDimension,
-      int measureCount, boolean isUnManagedTable) {
+      int measureCount, boolean isTransactionalTable) {
     List<ProjectionDimension> presentDimension =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
     boolean[] isDimensionExists = new boolean[queryDimensions.size()];
@@ -84,7 +84,8 @@ public class RestructureUtil {
             queryDimension.getDimension().getDataType();
       } else {
         for (CarbonDimension tableDimension : tableBlockDimensions) {
-          if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
+          if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
+              tableDimension)) {
             ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
             tableDimension.getColumnSchema()
                 .setPrecision(queryDimension.getDimension().getColumnSchema().getPrecision());
@@ -106,7 +107,8 @@ public class RestructureUtil {
           continue;
         }
         for (CarbonDimension tableDimension : tableComplexDimension) {
-          if (isColumnMatches(isUnManagedTable, queryDimension.getDimension(), tableDimension)) {
+          if (isColumnMatches(isTransactionalTable, queryDimension.getDimension(),
+              tableDimension)) {
             ProjectionDimension currentBlockDimension = new ProjectionDimension(tableDimension);
             // TODO: for complex dimension set scale and precision by traversing
             // the child dimensions
@@ -143,19 +145,19 @@ public class RestructureUtil {
   }
 
   /**
-   * Match the columns for managed and unmanaged tables
-   * @param isUnManagedTable
+   * Match the columns for transactional and non transactional tables
+   * @param isTransactionalTable
    * @param queryColumn
    * @param tableColumn
    * @return
    */
-  private static boolean isColumnMatches(boolean isUnManagedTable,
+  private static boolean isColumnMatches(boolean isTransactionalTable,
       CarbonColumn queryColumn, CarbonColumn tableColumn) {
-    // If it is unmanaged table just check the column names, no need to validate column id as
-    // multiple sdk's output placed in a single folder doesn't have same column ID but can
-    // have same column name
+    // If it is non transactional table just check the column names, no need to validate
+    // column id as multiple sdk's output placed in a single folder doesn't have same
+    // column ID but can have same column name
     return (tableColumn.getColumnId().equals(queryColumn.getColumnId()) ||
-        (isUnManagedTable && tableColumn.getColName().equals(queryColumn.getColName())));
+        (!isTransactionalTable && tableColumn.getColName().equals(queryColumn.getColName())));
   }
 
   /**
@@ -355,12 +357,12 @@ public class RestructureUtil {
    * @param blockExecutionInfo
    * @param queryMeasures        measures present in query
    * @param currentBlockMeasures current block measures
-   * @param isUnManagedTable
+   * @param isTransactionalTable
    * @return measures present in the block
    */
   public static List<ProjectionMeasure> createMeasureInfoAndGetCurrentBlockQueryMeasures(
       BlockExecutionInfo blockExecutionInfo, List<ProjectionMeasure> queryMeasures,
-      List<CarbonMeasure> currentBlockMeasures, boolean isUnManagedTable) {
+      List<CarbonMeasure> currentBlockMeasures, boolean isTransactionalTable) {
     MeasureInfo measureInfo = new MeasureInfo();
     List<ProjectionMeasure> presentMeasure = new ArrayList<>(queryMeasures.size());
     int numberOfMeasureInQuery = queryMeasures.size();
@@ -373,7 +375,7 @@ public class RestructureUtil {
       // then setting measure exists is true
       // otherwise adding a default value of a measure
       for (CarbonMeasure carbonMeasure : currentBlockMeasures) {
-        if (isColumnMatches(isUnManagedTable, carbonMeasure, queryMeasure.getMeasure())) {
+        if (isColumnMatches(isTransactionalTable, carbonMeasure, queryMeasure.getMeasure())) {
           ProjectionMeasure currentBlockMeasure = new ProjectionMeasure(carbonMeasure);
           carbonMeasure.getColumnSchema().setDataType(queryMeasure.getMeasure().getDataType());
           carbonMeasure.getColumnSchema().setPrecision(queryMeasure.getMeasure().getPrecision());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 0e2976a..8c24bd1 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -240,7 +240,7 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, false);
+    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId, true);
     String tupleId;
     if (isPartitionTable) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 91b35f5..27ec202 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2911,17 +2911,18 @@ public final class CarbonUtil {
    * @param identifier
    * @param filePath
    * @param segmentId
+   * @param isTransactionalTable
    * @return
    */
   public static String getBlockId(AbsoluteTableIdentifier identifier, String filePath,
-      String segmentId, boolean isUnmangedTable) {
+      String segmentId, boolean isTransactionalTable) {
     String blockId;
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
 
     if (filePath.startsWith(tablePath)) {
       String factDir = CarbonTablePath.getFactDir(tablePath);
-      if (filePath.startsWith(factDir) || isUnmangedTable) {
+      if (filePath.startsWith(factDir) || !isTransactionalTable) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;
       } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
index a72a6bf..403c85d 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonInputFormat.java
@@ -99,7 +99,8 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
       "mapreduce.input.carboninputformat.filter.predicate";
   private static final String COLUMN_PROJECTION = "mapreduce.input.carboninputformat.projection";
   private static final String TABLE_INFO = "mapreduce.input.carboninputformat.tableinfo";
-  private static final String UNMANAGED_TABLE = "mapreduce.input.carboninputformat.unmanaged";
+  private static final String CARBON_TRANSACTIONAL_TABLE =
+      "mapreduce.input.carboninputformat.transactional";
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
   private static final String DATA_MAP_DSTR = "mapreduce.input.carboninputformat.datamapdstr";
@@ -161,8 +162,9 @@ public abstract class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
-  public static void setUnmanagedTable(Configuration configuration, boolean isUnmanagedTable) {
-    configuration.set(UNMANAGED_TABLE, String.valueOf(isUnmanagedTable));
+  public static void setTransactionalTable(Configuration configuration,
+      boolean isTransactionalTable) {
+    configuration.set(CARBON_TRANSACTIONAL_TABLE, String.valueOf(isTransactionalTable));
   }
 
   public static void setPartitionIdList(Configuration configuration, List<String> partitionIds) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6f65d7d..2851de2 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -157,10 +157,14 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       }
       String uniqueId = null;
       if (overwriteSet) {
-        if (segmentSize == 0) {
-          newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+        if (!loadModel.isCarbonTransactionalTable()) {
+          CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(loadModel);
+        } else {
+          if (segmentSize == 0) {
+            newMetaEntry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE);
+          }
+          uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
         }
-        uniqueId = overwritePartitions(loadModel, newMetaEntry, uuid);
       } else {
         CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, false, uuid);
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index 7b5b8d1..f93be63 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -92,8 +92,8 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
   private static final Log LOG = LogFactory.getLog(CarbonTableInputFormat.class);
   private static final String CARBON_READ_SUPPORT = "mapreduce.input.carboninputformat.readsupport";
   private static final String CARBON_CONVERTER = "mapreduce.input.carboninputformat.converter";
-  private static final String CARBON_UNMANAGED_TABLE =
-      "mapreduce.input.carboninputformat.unmanaged";
+  private static final String CARBON_TRANSACTIONAL_TABLE =
+      "mapreduce.input.carboninputformat.transactional";
   public static final String DATABASE_NAME = "mapreduce.input.carboninputformat.databaseName";
   public static final String TABLE_NAME = "mapreduce.input.carboninputformat.tableName";
   // a cache for carbon table, it will be used in task side
@@ -627,10 +627,10 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
       throws IOException {
     if (readCommittedScope == null) {
       ReadCommittedScope readCommittedScope;
-      if (job.getConfiguration().getBoolean(CARBON_UNMANAGED_TABLE, false)) {
-        readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
-      } else {
+      if (job.getConfiguration().getBoolean(CARBON_TRANSACTIONAL_TABLE, true)) {
         readCommittedScope = new TableStatusReadCommittedScope(identifier);
+      } else {
+        readCommittedScope = new LatestFilesReadCommittedScope(identifier.getTablePath());
       }
       this.readCommittedScope = readCommittedScope;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index f93f849..43e0221 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -68,6 +68,8 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
   private static final String TEMP_STORE_LOCATIONS = "mapreduce.carbontable.tempstore.locations";
   private static final String OVERWRITE_SET = "mapreduce.carbontable.set.overwrite";
   public static final String COMPLEX_DELIMITERS = "mapreduce.carbontable.complex_delimiters";
+  private static final String CARBON_TRANSACTIONAL_TABLE =
+      "mapreduce.input.carboninputformat.transactional";
   public static final String SERIALIZATION_NULL_FORMAT =
       "mapreduce.carbontable.serialization.null.format";
   public static final String BAD_RECORDS_LOGGER_ENABLE =
@@ -271,6 +273,7 @@ public class CarbonTableOutputFormat extends FileOutputFormat<NullWritable, Obje
     CarbonProperties carbonProperty = CarbonProperties.getInstance();
     model.setDatabaseName(CarbonTableOutputFormat.getDatabaseName(conf));
     model.setTableName(CarbonTableOutputFormat.getTableName(conf));
+    model.setCarbonTransactionalTable(true);
     model.setCarbonDataLoadSchema(new CarbonDataLoadSchema(getCarbonTable(conf)));
     model.setTablePath(getTablePath(conf));
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
index b2c2d39..9075012 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/testutil/StoreCreator.java
@@ -128,6 +128,7 @@ public class StoreCreator {
     loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>());
     loadModel.setTablePath(absoluteTableIdentifier.getTablePath());
     loadModel.setDateFormat(null);
+    loadModel.setCarbonTransactionalTable(table.isTransactionalTable());
     loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty(
         CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
         CarbonCommonConstants.CARBON_TIMESTAMP_MILLIS));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
index 8ac2905..36c7414 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/CarbonInputFormatUtil.java
@@ -88,7 +88,8 @@ public class CarbonInputFormatUtil {
     if (partitionNames != null) {
       CarbonInputFormat.setPartitionsToPrune(conf, partitionNames);
     }
-    CarbonInputFormat.setUnmanagedTable(conf, carbonTable.getTableInfo().isUnManagedTable());
+    CarbonInputFormat
+        .setTransactionalTable(conf, carbonTable.getTableInfo().isTransactionalTable());
     CarbonProjection columnProjection = new CarbonProjection(projectionColumns);
     return createInputFormat(conf, carbonTable.getAbsoluteTableIdentifier(),
         filterExpression, columnProjection, dataMapJob);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index e12af63..6a8c40d 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -94,6 +94,7 @@ object CarbonDataStoreCreator {
       loadModel.setTableName(
         absoluteTableIdentifier.getCarbonTableIdentifier.getTableName)
       loadModel.setFactFilePath(dataFilePath)
+      loadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       loadModel.setLoadMetadataDetails(new ArrayList[LoadMetadataDetails]())
       loadModel.setTablePath(absoluteTableIdentifier.getTablePath)
       CarbonProperties.getInstance

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/resources/nontransactional.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/nontransactional.csv b/integration/spark-common-test/src/test/resources/nontransactional.csv
new file mode 100644
index 0000000..c22303b
--- /dev/null
+++ b/integration/spark-common-test/src/test/resources/nontransactional.csv
@@ -0,0 +1,3 @@
+name, age, height
+arvind, 33, 6.2
+bill, 35, 7.3

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
index 7841a23..9646c1d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCarbonFileInputFormatWithExternalCarbonTable.scala
@@ -55,7 +55,7 @@ class TestCarbonFileInputFormatWithExternalCarbonTable extends QueryTest with Be
       .toString()
 
     try {
-      val builder = CarbonWriter.builder()
+      val builder = CarbonWriter.builder().isTransactionalTable(true)
       val writer =
       if (persistSchema) {
         builder.persistSchemaFile(true)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
index f421d44..16f19a7 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestCreateTableUsingSparkCarbonFileFormat.scala
@@ -64,7 +64,7 @@ class TestCreateTableUsingSparkCarbonFileFormat extends QueryTest with BeforeAnd
       .toString()
 
     try {
-      val builder = CarbonWriter.builder()
+      val builder = CarbonWriter.builder().isTransactionalTable(true)
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
new file mode 100644
index 0000000..7798403
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestNonTransactionalCarbonTable.scala
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import java.io.{File, FileFilter}
+
+import org.apache.commons.io.FileUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonUtil
+import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
+
+
+class TestNonTransactionalCarbonTable extends QueryTest with BeforeAndAfterAll {
+
+  var writerPath = new File(this.getClass.getResource("/").getPath
+                            +
+                            "../." +
+                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
+    .getCanonicalPath
+  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
+  writerPath = writerPath.replace("\\", "/");
+
+  def buildTestDataSingleFile(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestData(3,false)
+  }
+
+  def buildTestDataMultipleFiles(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestData(1000000,false)
+  }
+
+  def buildTestDataTwice(): Any = {
+    FileUtils.deleteDirectory(new File(writerPath))
+    buildTestData(3,false)
+    buildTestData(3,false)
+  }
+
+  // prepare sdk writer output
+  def buildTestData(rows:Int, persistSchema:Boolean): Any = {
+    val schema = new StringBuilder()
+      .append("[ \n")
+      .append("   {\"name\":\"string\"},\n")
+      .append("   {\"age\":\"int\"},\n")
+      .append("   {\"height\":\"double\"}\n")
+      .append("]")
+      .toString()
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        if (persistSchema) {
+          builder.persistSchemaFile(true)
+          builder.withSchema(Schema.parseJson(schema))
+            .outputPath(writerPath)
+            .isTransactionalTable(false)
+            .uniqueIdentifier(System.currentTimeMillis)
+            .buildWriterForCSVInput()
+        } else {
+          builder.withSchema(Schema.parseJson(schema))
+            .outputPath(writerPath)
+            .isTransactionalTable(false)
+            .uniqueIdentifier(System.currentTimeMillis).withBlockSize(2)
+            .buildWriterForCSVInput()
+        }
+      var i = 0
+      while (i < rows) {
+        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => None
+      case _ => None
+    }
+  }
+
+  def cleanTestData() = {
+    FileUtils.deleteDirectory(new File(writerPath))
+  }
+
+  def deleteFile(path: String, extension: String): Unit = {
+    val file: CarbonFile = FileFactory
+      .getCarbonFile(path, FileFactory.getFileType(path))
+
+    for (eachDir <- file.listFiles) {
+      if (!eachDir.isDirectory) {
+        if (eachDir.getName.endsWith(extension)) {
+          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
+        }
+      } else {
+        deleteFile(eachDir.getPath, extension)
+      }
+    }
+  }
+
+  override def beforeAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  override def afterAll(): Unit = {
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+  }
+
+  test("test create External Table with Schema with partition, should ignore schema and partition")
+  {
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("test create External Table with insert into feature")
+  {
+    buildTestData(3, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql("DROP TABLE IF EXISTS t1")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+    sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+    sql("select * from t1").show(200,false)
+    sql("insert into sdkOutputTable select * from t1").show(200,false)
+
+    checkAnswer(sql(s"""select * from sdkOutputTable where age = 12"""),
+      Seq(Row("aaaaa", 12, 20.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    sql("drop table t1")
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("test create External Table with insert overwrite")
+  {
+    buildTestData(3, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql("DROP TABLE IF EXISTS t1")
+    sql("DROP TABLE IF EXISTS t2")
+    sql("DROP TABLE IF EXISTS t3")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+    sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+      Seq(Row(1)))
+
+    sql("insert overwrite table sdkOutputTable select * from t1").show(200,false)
+
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+      Seq(Row(0)))
+
+    sql("DROP TABLE if exists sdkOutputTable")
+    sql("drop table if exists t1")
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("test create External Table with Load")
+  {
+    buildTestData(3, false)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql("DROP TABLE IF EXISTS t1")
+    sql("DROP TABLE IF EXISTS t2")
+    sql("DROP TABLE IF EXISTS t3")
+
+    // with partition
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("create table if not exists t1 (name string, age int, height double) STORED BY 'org.apache.carbondata.format'")
+    sql (s"""insert into t1 values ("aaaaa", 12, 20)""").show(200,false)
+
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable where age = 1"""),
+      Seq(Row(1)))
+
+    // scalastyle:off
+    sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$resourcesPath/nontransactional.csv'
+         | INTO TABLE sdkOutputTable
+         | OPTIONS('HEADER'='true')
+       """.stripMargin)
+
+    checkAnswer(sql(s"""select count(*) from sdkOutputTable where height = 6.2"""),
+      Seq(Row(1)))
+
+    sql("DROP TABLE if exists sdkOutputTable")
+    sql("drop table if exists t1")
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+
+  test("read non transactional table, files written from sdk Writer Output)") {
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
+      Row("robot1"),
+      Row("robot2")))
+
+    checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
+      Seq(Row("robot2", 2, 1.0)))
+
+    checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
+      Seq(Row("robot0", 0, 0.0),
+        Row("robot1", 1, 0.5)))
+
+    checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
+
+    checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
+
+    sql("DROP TABLE sdkOutputTable1")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Test Blocked operations for non transactional table ") {
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    //1. alter datatype
+    var exception = intercept[MalformedCarbonCommandException] {
+      sql("Alter table sdkOutputTable change age age BIGINT")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //2. Datamap creation
+    exception = intercept[MalformedCarbonCommandException] {
+      sql(
+        "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
+        "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //3. compaction
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //4. Show segments
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show segments for table sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //5. Delete segment by ID
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //6. Delete segment by date
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //7. Update Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //8. Delete Segment
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //9. Show partition
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("Show partitions sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    //12. Streaming table creation
+    // No need as External table don't accept table properties
+
+    //13. Alter table rename command
+    exception = intercept[MalformedCarbonCommandException] {
+      sql("ALTER TABLE sdkOutputTable RENAME to newTable")
+    }
+    assert(exception.getMessage()
+      .contains("Unsupported operation on non transactional table"))
+
+    sql("DROP TABLE sdkOutputTable")
+    //drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("test create External Table With Schema, should ignore the schema provided") {
+    buildTestDataSingleFile()
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    // with schema
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
+         |'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    sql("DROP TABLE sdkOutputTable")
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output file without Carbondata file should fail") {
+    buildTestDataSingleFile()
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //    data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+  test("Read sdk writer output file without any file should fail") {
+    buildTestDataSingleFile()
+    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
+    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
+    assert(new File(writerPath).exists())
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    val exception = intercept[Exception] {
+      //data source file format
+      sql(
+        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+           |'$writerPath' """.stripMargin)
+
+      sql("select * from sdkOutputTable").show(false)
+    }
+    assert(exception.getMessage()
+      .contains("Operation not allowed: Invalid table path provided:"))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read sdk writer output multiple files ") {
+    buildTestDataMultipleFiles()
+    assert(new File(writerPath).exists())
+    val folder = new File(writerPath)
+    val dataFiles = folder.listFiles(new FileFilter() {
+      override def accept(pathname: File): Boolean = {
+        pathname.getName
+          .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
+      }
+    })
+    Assert.assertNotNull(dataFiles)
+    Assert.assertNotEquals(1, dataFiles.length)
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+    sql("DROP TABLE IF EXISTS t1")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+    checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+  test("Read two sdk writer outputs with same column name placed in same folder") {
+    buildTestDataTwice()
+    assert(new File(writerPath).exists())
+
+    sql("DROP TABLE IF EXISTS sdkOutputTable")
+
+    sql(
+      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
+         |'$writerPath' """.stripMargin)
+
+
+    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0),
+      Row("robot0", 0, 0.0),
+      Row("robot1", 1, 0.5),
+      Row("robot2", 2, 1.0)))
+
+    // drop table should not delete the files
+    assert(new File(writerPath).exists())
+    cleanTestData()
+  }
+
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
index de91f2a..53dadf6 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestSparkCarbonFileFormatWithSparkSession.scala
@@ -54,7 +54,7 @@ object TestSparkCarbonFileFormatWithSparkSession {
       .toString()
 
     try {
-      val builder = CarbonWriter.builder()
+      val builder = CarbonWriter.builder().isTransactionalTable(true)
       val writer =
         if (persistSchema) {
           builder.persistSchemaFile(true)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
deleted file mode 100644
index a6ee807..0000000
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestUnmanagedCarbonTable.scala
+++ /dev/null
@@ -1,408 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.createTable
-
-import java.io.{File, FileFilter}
-
-import org.apache.commons.io.FileUtils
-import org.apache.spark.sql.Row
-import org.apache.spark.sql.test.util.QueryTest
-import org.junit.Assert
-import org.scalatest.BeforeAndAfterAll
-
-import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.util.CarbonUtil
-import org.apache.carbondata.sdk.file.{CarbonWriter, Schema}
-
-
-class TestUnmanagedCarbonTable extends QueryTest with BeforeAndAfterAll {
-
-  var writerPath = new File(this.getClass.getResource("/").getPath
-                            +
-                            "../." +
-                            "./src/test/resources/SparkCarbonFileFormat/WriterOutput/")
-    .getCanonicalPath
-  //getCanonicalPath gives path with \, so code expects /. Need to handle in code ?
-  writerPath = writerPath.replace("\\", "/");
-
-  def buildTestDataSingleFile(): Any = {
-    FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(3,false)
-  }
-
-  def buildTestDataMultipleFiles(): Any = {
-    FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(1000000,false)
-  }
-
-  def buildTestDataTwice(): Any = {
-    FileUtils.deleteDirectory(new File(writerPath))
-    buildTestData(3,false)
-    buildTestData(3,false)
-  }
-
-  // prepare sdk writer output
-  def buildTestData(rows:Int, persistSchema:Boolean): Any = {
-    val schema = new StringBuilder()
-      .append("[ \n")
-      .append("   {\"name\":\"string\"},\n")
-      .append("   {\"age\":\"int\"},\n")
-      .append("   {\"height\":\"double\"}\n")
-      .append("]")
-      .toString()
-
-    try {
-      val builder = CarbonWriter.builder()
-      val writer =
-        if (persistSchema) {
-          builder.persistSchemaFile(true)
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
-            .uniqueIdentifier(
-              System.currentTimeMillis)
-            .buildWriterForCSVInput()
-        } else {
-          builder.withSchema(Schema.parseJson(schema)).outputPath(writerPath).unManagedTable(true)
-            .uniqueIdentifier(
-              System.currentTimeMillis).withBlockSize(2)
-            .buildWriterForCSVInput()
-        }
-      var i = 0
-      while (i < rows) {
-        writer.write(Array[String]("robot" + i, String.valueOf(i), String.valueOf(i.toDouble / 2)))
-        i += 1
-      }
-      writer.close()
-    } catch {
-      case ex: Exception => None
-      case _ => None
-    }
-  }
-
-  def cleanTestData() = {
-    FileUtils.deleteDirectory(new File(writerPath))
-  }
-
-  def deleteFile(path: String, extension: String): Unit = {
-    val file: CarbonFile = FileFactory
-      .getCarbonFile(path, FileFactory.getFileType(path))
-
-    for (eachDir <- file.listFiles) {
-      if (!eachDir.isDirectory) {
-        if (eachDir.getName.endsWith(extension)) {
-          CarbonUtil.deleteFoldersAndFilesSilent(eachDir)
-        }
-      } else {
-        deleteFile(eachDir.getPath, extension)
-      }
-    }
-  }
-
-  override def beforeAll(): Unit = {
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-  }
-
-  override def afterAll(): Unit = {
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-  }
-
-  test("test create External Table with Schema with partition, should ignore schema and partition")
-  {
-    buildTestDataSingleFile()
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    // with partition
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable(name string) PARTITIONED BY (age int) STORED BY
-         |'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
-      Row("robot1", 1, 0.5),
-      Row("robot2", 2, 1.0)))
-
-    sql("DROP TABLE sdkOutputTable")
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("read unmanaged table, files written from sdk Writer Output)") {
-    buildTestDataSingleFile()
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable1")
-
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable1 STORED BY 'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-    checkAnswer(sql("select * from sdkOutputTable1"), Seq(Row("robot0", 0, 0.0),
-      Row("robot1", 1, 0.5),
-      Row("robot2", 2, 1.0)))
-
-    checkAnswer(sql("select name from sdkOutputTable1"), Seq(Row("robot0"),
-      Row("robot1"),
-      Row("robot2")))
-
-    checkAnswer(sql("select age from sdkOutputTable1"), Seq(Row(0), Row(1), Row(2)))
-
-    checkAnswer(sql("select * from sdkOutputTable1 where age > 1 and age < 8"),
-      Seq(Row("robot2", 2, 1.0)))
-
-    checkAnswer(sql("select * from sdkOutputTable1 where name = 'robot2'"),
-      Seq(Row("robot2", 2, 1.0)))
-
-    checkAnswer(sql("select * from sdkOutputTable1 where name like '%obot%' limit 2"),
-      Seq(Row("robot0", 0, 0.0),
-        Row("robot1", 1, 0.5)))
-
-    checkAnswer(sql("select sum(age) from sdkOutputTable1 where name like 'robot%'"), Seq(Row(3)))
-
-    checkAnswer(sql("select count(*) from sdkOutputTable1 where name like 'robot%' "), Seq(Row(3)))
-
-    checkAnswer(sql("select count(*) from sdkOutputTable1"), Seq(Row(3)))
-
-    sql("DROP TABLE sdkOutputTable1")
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("Test Blocked operations for unmanaged table ") {
-    buildTestDataSingleFile()
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-    //1. alter datatype
-    var exception = intercept[MalformedCarbonCommandException] {
-      sql("Alter table sdkOutputTable change age age BIGINT")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //2. Load
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("LOAD DATA LOCAL INPATH '/path/to/data' INTO TABLE sdkOutputTable ")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //3. Datamap creation
-    exception = intercept[MalformedCarbonCommandException] {
-      sql(
-        "CREATE DATAMAP agg_sdkOutputTable ON TABLE sdkOutputTable USING \"preaggregate\" AS " +
-        "SELECT name, sum(age) FROM sdkOutputTable GROUP BY name,age")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //4. Insert Into
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("insert into table sdkOutputTable SELECT 20,'robotX',2.5")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //5. compaction
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("ALTER TABLE sdkOutputTable COMPACT 'MAJOR'")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //6. Show segments
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("Show segments for table sdkOutputTable").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //7. Delete segment by ID
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.ID IN (0)")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //8. Delete segment by date
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("DELETE FROM TABLE sdkOutputTable WHERE SEGMENT.STARTTIME BEFORE '2017-06-01 12:05:06'")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //9. Update column
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("UPDATE sdkOutputTable SET (age) = (age + 9) ").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //10. Delete column
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("DELETE FROM sdkOutputTable where name='robot1'").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //11. Show partition
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("Show partitions sdkOutputTable").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    //12. Streaming table creation
-    // No need as External table don't accept table properties
-
-    //13. Alter table rename command
-    exception = intercept[MalformedCarbonCommandException] {
-      sql("ALTER TABLE sdkOutputTable RENAME to newTable")
-    }
-    assert(exception.getMessage()
-      .contains("Unsupported operation on unmanaged table"))
-
-    sql("DROP TABLE sdkOutputTable")
-    //drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("test create External Table With Schema, should ignore the schema provided") {
-    buildTestDataSingleFile()
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    // with schema
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable(age int) STORED BY
-         |'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
-      Row("robot1", 1, 0.5),
-      Row("robot2", 2, 1.0)))
-
-    sql("DROP TABLE sdkOutputTable")
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("Read sdk writer output file without Carbondata file should fail") {
-    buildTestDataSingleFile()
-    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    val exception = intercept[Exception] {
-      //    data source file format
-      sql(
-        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
-           |'$writerPath' """.stripMargin)
-    }
-    assert(exception.getMessage()
-      .contains("Operation not allowed: Invalid table path provided:"))
-
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-
-  test("Read sdk writer output file without any file should fail") {
-    buildTestDataSingleFile()
-    deleteFile(writerPath, CarbonCommonConstants.FACT_FILE_EXT)
-    deleteFile(writerPath, CarbonCommonConstants.UPDATE_INDEX_FILE_EXT)
-    assert(new File(writerPath).exists())
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    val exception = intercept[Exception] {
-      //data source file format
-      sql(
-        s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
-           |'$writerPath' """.stripMargin)
-
-      sql("select * from sdkOutputTable").show(false)
-    }
-    assert(exception.getMessage()
-      .contains("Operation not allowed: Invalid table path provided:"))
-
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("Read sdk writer output multiple files ") {
-    buildTestDataMultipleFiles()
-    assert(new File(writerPath).exists())
-    val folder = new File(writerPath)
-    val dataFiles = folder.listFiles(new FileFilter() {
-      override def accept(pathname: File): Boolean = {
-        pathname.getName
-          .endsWith(CarbonCommonConstants.FACT_FILE_EXT)
-      }
-    })
-    Assert.assertNotNull(dataFiles)
-    Assert.assertNotEquals(1, dataFiles.length)
-
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-    checkAnswer(sql("select count(*) from sdkOutputTable"), Seq(Row(1000000)))
-
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-  test("Read two sdk writer outputs with same column name placed in same folder") {
-    buildTestDataTwice()
-    assert(new File(writerPath).exists())
-
-    sql("DROP TABLE IF EXISTS sdkOutputTable")
-
-    sql(
-      s"""CREATE EXTERNAL TABLE sdkOutputTable STORED BY 'carbondata' LOCATION
-         |'$writerPath' """.stripMargin)
-
-
-    checkAnswer(sql("select * from sdkOutputTable"), Seq(Row("robot0", 0, 0.0),
-      Row("robot1", 1, 0.5),
-      Row("robot2", 2, 1.0),
-      Row("robot0", 0, 0.0),
-      Row("robot1", 1, 0.5),
-      Row("robot2", 2, 1.0)))
-
-    // drop table should not delete the files
-    assert(new File(writerPath).exists())
-    cleanTestData()
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 6d67daf..954eefc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -519,7 +519,7 @@ class CarbonScanRDD[T: ClassTag](
       CarbonInputFormat.setPartitionsToPrune(conf, partitionNames.asJava)
     }
 
-    CarbonInputFormat.setUnmanagedTable(conf, tableInfo.isUnManagedTable)
+    CarbonInputFormat.setTransactionalTable(conf, tableInfo.isTransactionalTable)
     createInputFormat(conf)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index e978614..db22d6b 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -275,6 +275,7 @@ object CarbonDataRDDFactory {
     loadModel.setTableName(table.getCarbonTableIdentifier.getTableName)
     loadModel.setDatabaseName(table.getCarbonTableIdentifier.getDatabaseName)
     loadModel.setTablePath(table.getTablePath)
+    loadModel.setCarbonTransactionalTable(table.isTransactionalTable)
     loadModel.readAndSetLoadMetadataDetails()
     val loadStartTime = CarbonUpdateUtil.readCurrentTime()
     loadModel.setFactTimeStamp(loadStartTime)
@@ -300,7 +301,7 @@ object CarbonDataRDDFactory {
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
     // create new segment folder  in carbon store
-    if (updateModel.isEmpty) {
+    if (updateModel.isEmpty && carbonLoadModel.isCarbonTransactionalTable) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
     }
     var loadStatus = SegmentStatus.SUCCESS
@@ -313,7 +314,7 @@ object CarbonDataRDDFactory {
       CarbonTablePath.addSegmentPrefix(carbonLoadModel.getSegmentId) + LockUsage.LOCK)
 
     try {
-      if (segmentLock.lockWithRetries()) {
+      if (!carbonLoadModel.isCarbonTransactionalTable || segmentLock.lockWithRetries()) {
         if (updateModel.isDefined) {
           res = loadDataFrameForUpdate(
             sqlContext,
@@ -493,13 +494,14 @@ object CarbonDataRDDFactory {
       }
       // as no record loaded in new segment, new segment should be deleted
       val newEntryLoadStatus =
-      if (!carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
-          !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
-        LOGGER.warn("Cannot write load metadata file as there is no data to load")
-        SegmentStatus.MARKED_FOR_DELETE
-      } else {
-        loadStatus
-      }
+        if (carbonLoadModel.isCarbonTransactionalTable &&
+            !carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.isChildDataMap &&
+            !CarbonLoaderUtil.isValidSegment(carbonLoadModel, carbonLoadModel.getSegmentId.toInt)) {
+          LOGGER.warn("Cannot write load metadata file as there is no data to load")
+          SegmentStatus.MARKED_FOR_DELETE
+        } else {
+          loadStatus
+        }
 
       writeDictionary(carbonLoadModel, result, writeAll = false)
 
@@ -813,6 +815,10 @@ object CarbonDataRDDFactory {
     true)
     CarbonLoaderUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
+
+    if (!carbonLoadModel.isCarbonTransactionalTable && overwriteTable) {
+      CarbonLoaderUtil.deleteNonTransactionalTableForInsertOverwrite(carbonLoadModel)
+    }
     val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable, uuid)
     if (!done) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index e29986a..488a53d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -77,7 +77,8 @@ case class CarbonCountStar(
     val job = new Job(jobConf)
     FileInputFormat.addInputPath(job, new Path(absoluteTableIdentifier.getTablePath))
     CarbonInputFormat
-      .setUnmanagedTable(job.getConfiguration, carbonTable.getTableInfo.isUnManagedTable)
+      .setTransactionalTable(job.getConfiguration,
+        carbonTable.getTableInfo.isTransactionalTable)
     (job, carbonInputFormat)
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index c8d9fe4..3600854 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -313,9 +313,10 @@ object CarbonSource {
     } else {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
       val isExternal = properties.getOrElse("isExternal", "false")
-      val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
-      tableInfo.setUnManagedTable(isUnManagedTable)
-      if (!isUnManagedTable && !metaStore.isReadFromHiveMetaStore) {
+      val isTransactionalTable = properties.getOrElse("isTransactional", "true")
+        .contains("true")
+      tableInfo.setTransactionalTable(isTransactionalTable)
+      if (isTransactionalTable && !metaStore.isReadFromHiveMetaStore) {
         // save to disk
         metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore
@@ -337,15 +338,16 @@ object CarbonSource {
     val model = createTableInfoFromParams(properties, dataSchema, identifier)
     val tableInfo: TableInfo = TableNewProcessor(model)
     val isExternal = properties.getOrElse("isExternal", "false")
-    val isUnManagedTable = properties.getOrElse("isUnManaged", "false").contains("true")
+    val isTransactionalTable = properties.getOrElse("isTransactional", "true")
+      .contains("true")
     val tablePath = properties.getOrElse("path", "")
     tableInfo.setTablePath(identifier.getTablePath)
-    tableInfo.setUnManagedTable(isUnManagedTable)
+    tableInfo.setTransactionalTable(isTransactionalTable)
     tableInfo.setDatabaseName(identifier.getDatabaseName)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
-    val map = if (!metaStore.isReadFromHiveMetaStore && !isUnManagedTable) {
+    val map = if (!metaStore.isReadFromHiveMetaStore && isTransactionalTable) {
       metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 60366c4..03f165c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -56,8 +56,8 @@ case class CarbonCreateDataMapCommand(
       case _ => null
     }
 
-    if (mainTable != null && mainTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (mainTable != null && !mainTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     if (mainTable != null && mainTable.getDataMapSchema(dataMapName) != null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
index c462c9e..e12f052 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala
@@ -77,8 +77,8 @@ case class CarbonAlterTableCompactionCommand(
       }
       relation.carbonTable
     }
-    if (table.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!table.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     if (CarbonUtil.hasAggregationDataMap(table) ||
@@ -126,6 +126,7 @@ case class CarbonAlterTableCompactionCommand(
       // Need to fill dimension relation
       carbonLoadModel.setCarbonDataLoadSchema(dataLoadSchema)
       carbonLoadModel.setTableName(table.getTableName)
+      carbonLoadModel.setCarbonTransactionalTable(table.isTransactionalTable)
       carbonLoadModel.setDatabaseName(table.getDatabaseName)
       carbonLoadModel.setTablePath(table.getTablePath)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 57ccd82..165a032 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -36,8 +36,8 @@ case class CarbonDeleteLoadByIdCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     // if insert overwrite in progress, do not allow delete segment

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b7b8073d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index 7d0655d..19f5100 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -37,8 +37,8 @@ case class CarbonDeleteLoadByLoadDateCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
-    if (carbonTable.getTableInfo.isUnManagedTable) {
-      throw new MalformedCarbonCommandException("Unsupported operation on unmanaged table")
+    if (!carbonTable.getTableInfo.isTransactionalTable) {
+      throw new MalformedCarbonCommandException("Unsupported operation on non transactional table")
     }
 
     // if insert overwrite in progress, do not allow delete segment