You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/12/04 06:36:11 UTC

[1/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support when creating table

Repository: carbondata
Updated Branches:
  refs/heads/master 25c28242a -> 2fe7758be


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index efb6796..b043698 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -28,8 +28,7 @@ import org.apache.hadoop.mapred.JobConf
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.ExecutionErrors
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
@@ -50,16 +49,9 @@ import org.apache.carbondata.spark.DeleteDelataResultImpl
 object DeleteExecution {
   val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getName)
 
-  def getTableIdentifier(tableIdentifier: Seq[String]): TableIdentifier = {
-    if (tableIdentifier.size > 1) {
-      TableIdentifier(tableIdentifier(1), Some(tableIdentifier(0)))
-    } else {
-      TableIdentifier(tableIdentifier(0), None)
-    }
-  }
-
   def deleteDeltaExecution(
-      identifier: Seq[String],
+      databaseNameOp: Option[String],
+      tableName: String,
       sparkSession: SparkSession,
       dataRdd: RDD[Row],
       timestamp: String,
@@ -67,12 +59,10 @@ object DeleteExecution {
       executorErrors: ExecutionErrors): Boolean = {
 
     var res: Array[List[(SegmentStatus, (SegmentUpdateDetails, ExecutionErrors))]] = null
-    val tableName = getTableIdentifier(identifier).table
-    val database = GetDB.getDatabaseName(getTableIdentifier(identifier).database, sparkSession)
-    val carbonTable = CarbonEnv.getCarbonTable(Some(database), tableName)(sparkSession)
+    val database = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier
-    val carbonTablePath = CarbonStorePath
-      .getCarbonTablePath(absoluteTableIdentifier)
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
     val factPath = carbonTablePath.getFactDir
 
     var deleteStatus = true
@@ -85,7 +75,6 @@ object DeleteExecution {
         .map(row => Row(row.get(row.fieldIndex(
           CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))))
       sparkSession.createDataFrame(rdd, schema).rdd
-      // sqlContext.createDataFrame(rdd, schema).rdd
     } else {
       dataRdd
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index 553663e..bdecac1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -46,7 +46,7 @@ object HorizontalCompaction {
    */
   def tryHorizontalCompaction(
       sparkSession: SparkSession,
-      carbonRelation: CarbonRelation,
+      carbonTable: CarbonTable,
       isUpdateOperation: Boolean): Unit = {
 
     if (!CarbonDataMergerUtil.isHorizontalCompactionEnabled) {
@@ -54,7 +54,6 @@ object HorizontalCompaction {
     }
 
     var compactionTypeIUD = CompactionType.IUD_UPDDEL_DELTA
-    val carbonTable = carbonRelation.carbonTable
     val absTableIdentifier = carbonTable.getAbsoluteTableIdentifier
     val updateTimeStamp = System.currentTimeMillis()
     // To make sure that update and delete timestamps are not same,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index f8d99c0..a17e745 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -27,6 +27,7 @@ import org.apache.spark.sql.parser.CarbonSpark2SqlParser
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.path.CarbonTablePath
 
 /**
  * Below command class will be used to create pre-aggregate table
@@ -72,7 +73,9 @@ case class CreatePreAggregateTableCommand(
     // also get updated
     tableModel.parentTable = Some(parentTable)
     tableModel.dataMapRelation = Some(fieldRelationMap)
-    CarbonCreateTableCommand(tableModel).run(sparkSession)
+    val tablePath =
+      CarbonEnv.getTablePath(tableModel.databaseNameOp, tableModel.tableName)(sparkSession)
+    CarbonCreateTableCommand(tableModel, Some(tablePath)).run(sparkSession)
 
     val table = CarbonEnv.getCarbonTable(tableIdentifier)(sparkSession)
     val tableInfo = table.getTableInfo
@@ -112,8 +115,10 @@ case class CreatePreAggregateTableCommand(
     // load child table if parent table has existing segments
     val dbName = CarbonEnv.getDatabaseName(parentTableIdentifier.database)(sparkSession)
     val tableName = tableIdentifier.table
-    val metastorePath = CarbonEnv.getMetadataPath(Some(dbName),
-      parentTableIdentifier.table)(sparkSession)
+    val metastorePath = CarbonTablePath.getMetadataPath(
+      CarbonEnv.getTablePath(
+        parentTableIdentifier.database,
+        parentTableIdentifier.table)(sparkSession))
     // This will be used to check if the parent table has any segments or not. If not then no
     // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT
     // table.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
index 593a675..9cf36fe 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala
@@ -87,8 +87,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val oldTableIdentifier = carbonTable.getAbsoluteTableIdentifier
       DataMapStoreManager.getInstance().clearDataMaps(oldTableIdentifier)
       // get the latest carbon table and check for column existence
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
-      val tableMetadataFile = carbonTablePath.getPath
+      val oldTablePath = CarbonStorePath.getCarbonTablePath(oldTableIdentifier)
+      val tableMetadataFile = oldTablePath.getPath
       val operationContext = new OperationContext
       // TODO: Pass new Table Path in pre-event.
       val alterTableRenamePreEvent: AlterTableRenamePreEvent = AlterTableRenamePreEvent(
@@ -98,7 +98,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
         sparkSession)
       OperationListenerBus.getInstance().fireEvent(alterTableRenamePreEvent, operationContext)
       val tableInfo: org.apache.carbondata.format.TableInfo =
-        metastore.getThriftTableInfo(carbonTablePath)(sparkSession)
+        metastore.getThriftTableInfo(oldTablePath)(sparkSession)
       val schemaEvolutionEntry = new SchemaEvolutionEntry(System.currentTimeMillis)
       schemaEvolutionEntry.setTableName(newTableName)
       timeStamp = System.currentTimeMillis()
@@ -107,8 +107,7 @@ private[sql] case class CarbonAlterTableRenameCommand(
       val fileType = FileFactory.getFileType(tableMetadataFile)
       val newTableIdentifier = new CarbonTableIdentifier(oldDatabaseName,
         newTableName, carbonTable.getCarbonTableIdentifier.getTableId)
-      var newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, newTableIdentifier)
-
+      var newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
       metastore.removeTableFromMetadata(oldDatabaseName, oldTableName)
       val hiveClient = sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
         .asInstanceOf[HiveExternalCatalog].client
@@ -121,8 +120,8 @@ private[sql] case class CarbonAlterTableRenameCommand(
       // changed the rename order to deal with situation when carbon table and hive table
       // will point to the same tablePath
       if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
-        val rename = FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
-          .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
+        val rename = FileFactory.getCarbonFile(oldTablePath.getPath, fileType)
+          .renameForce(oldTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        newTableName)
         if (!rename) {
           renameBadRecords(newTableName, oldTableName, oldDatabaseName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
index 9198f57..d967bf2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonCreateTableCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession, _}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, _}
 import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.execution.SQLExecution.EXECUTION_ID_KEY
 import org.apache.spark.sql.execution.command.{Field, MetadataCommand, TableModel, TableNewProcessor}
@@ -30,27 +30,35 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.exception.InvalidConfigurationException
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
 
 case class CarbonCreateTableCommand(
     cm: TableModel,
+    tableLocation: Option[String] = None,
     createDSTable: Boolean = true)
   extends MetadataCommand {
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
-    val storePath = CarbonProperties.getStorePath
-    CarbonEnv.getInstance(sparkSession).carbonMetastore.
-      checkSchemasModifiedTimeAndReloadTables()
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    cm.databaseName = GetDB.getDatabaseName(cm.databaseNameOp, sparkSession)
-    val dbLocation = GetDB.getDatabaseLocation(cm.databaseName, sparkSession, storePath)
-    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + cm.tableName
-    val tbName = cm.tableName
-    val dbName = cm.databaseName
-    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tbName]")
+    val tableName = cm.tableName
+    val dbName = CarbonEnv.getDatabaseName(cm.databaseNameOp)(sparkSession)
+    LOGGER.audit(s"Creating Table with Database name [$dbName] and Table name [$tableName]")
 
-    val tableInfo: TableInfo = TableNewProcessor(cm)
+    if (sparkSession.sessionState.catalog.listTables(dbName)
+      .exists(_.table.equalsIgnoreCase(tableName))) {
+      if (!cm.ifNotExistsSet) {
+        LOGGER.audit(
+          s"Table creation with Database name [$dbName] and Table name [$tableName] failed. " +
+          s"Table [$tableName] already exists under database [$dbName]")
+        throw new TableAlreadyExistsException(dbName, tableName)
+      }
+    }
+
+    val tablePath = tableLocation.getOrElse(
+      CarbonEnv.getTablePath(cm.databaseNameOp, tableName)(sparkSession))
+    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    val tableInfo: TableInfo = TableNewProcessor(cm, tableIdentifier)
 
     // Add validation for sort scope when create table
     val sortScope = tableInfo.getFactTable.getTableProperties.asScala
@@ -65,56 +73,48 @@ case class CarbonCreateTableCommand(
       CarbonException.analysisException("Table should have at least one column.")
     }
 
-    if (sparkSession.sessionState.catalog.listTables(dbName)
-      .exists(_.table.equalsIgnoreCase(tbName))) {
-      if (!cm.ifNotExistsSet) {
-        LOGGER.audit(
-          s"Table creation with Database name [$dbName] and Table name [$tbName] failed. " +
-          s"Table [$tbName] already exists under database [$dbName]")
-        throw new TableAlreadyExistsException(dbName, tbName)
-      }
-    } else {
-      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tbName)
-      val operationContext = new OperationContext
-      val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
-        new CreateTablePreExecutionEvent(sparkSession,
-          tableIdentifier.getCarbonTableIdentifier,
-          tablePath)
-      OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
-      // Add Database to catalog and persist
-      val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
-      if (createDSTable) {
-        try {
-          val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
-          cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
-          cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
+    val operationContext = new OperationContext
+    val createTablePreExecutionEvent: CreateTablePreExecutionEvent =
+      CreateTablePreExecutionEvent(sparkSession, tableIdentifier)
+    OperationListenerBus.getInstance.fireEvent(createTablePreExecutionEvent, operationContext)
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val carbonSchemaString = catalog.generateTableSchemaString(tableInfo, tableIdentifier)
+    if (createDSTable) {
+      try {
+        val fields = new Array[Field](cm.dimCols.size + cm.msrCols.size)
+        cm.dimCols.foreach(f => fields(f.schemaOrdinal) = f)
+        cm.msrCols.foreach(f => fields(f.schemaOrdinal) = f)
 
-          sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
-          sparkSession.sql(
-            s"""CREATE TABLE $dbName.$tbName
-               |(${ fields.map(f => f.rawSchema).mkString(",") })
-               |USING org.apache.spark.sql.CarbonSource""".stripMargin +
-            s""" OPTIONS (tableName "$tbName", dbName "$dbName", tablePath """.stripMargin +
-            s""""$tablePath", path "$tablePath" $carbonSchemaString) """.stripMargin)
-        } catch {
-          case e: AnalysisException => throw e
-          case e: Exception =>
-            // call the drop table to delete the created table.
-            CarbonEnv.getInstance(sparkSession).carbonMetastore
-              .dropTable(tableIdentifier)(sparkSession)
+        sparkSession.sparkContext.setLocalProperty(EXECUTION_ID_KEY, null)
+        val tablePath = tableIdentifier.getTablePath
+        sparkSession.sql(
+          s"""CREATE TABLE $dbName.$tableName
+             |(${ fields.map(f => f.rawSchema).mkString(",") })
+             |USING org.apache.spark.sql.CarbonSource
+             |OPTIONS (
+             |  tableName "$tableName",
+             |  dbName "$dbName",
+             |  tablePath "$tablePath",
+             |  path "$tablePath"
+             |  $carbonSchemaString)
+             """.stripMargin)
+      } catch {
+        case e: AnalysisException => throw e
+        case e: Exception =>
+          // call the drop table to delete the created table.
+          CarbonEnv.getInstance(sparkSession).carbonMetastore
+            .dropTable(tableIdentifier)(sparkSession)
 
-            val msg = s"Create table'$tbName' in database '$dbName' failed."
-            LOGGER.audit(msg)
-            LOGGER.error(e, msg)
-            CarbonException.analysisException(msg)
-        }
+          val msg = s"Create table'$tableName' in database '$dbName' failed."
+          LOGGER.audit(msg)
+          LOGGER.error(e, msg)
+          CarbonException.analysisException(msg)
       }
-      val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
-        new CreateTablePostExecutionEvent(sparkSession, tableIdentifier.getCarbonTableIdentifier)
-      OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
-      LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tbName]")
     }
+    val createTablePostExecutionEvent: CreateTablePostExecutionEvent =
+      CreateTablePostExecutionEvent(sparkSession, tableIdentifier)
+    OperationListenerBus.getInstance.fireEvent(createTablePostExecutionEvent, operationContext)
+    LOGGER.audit(s"Table created with Database name [$dbName] and Table name [$tableName]")
     Seq.empty
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 9ec738c..98f103b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -19,22 +19,17 @@ package org.apache.spark.sql.execution.command.table
 
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.events._
 
 case class CarbonDropTableCommand(
@@ -44,49 +39,28 @@ case class CarbonDropTableCommand(
     dropChildTable: Boolean = false)
   extends AtomicRunnableCommand {
 
+  var carbonTable: CarbonTable = _
+
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val identifier = TableIdentifier(tableName, Option(dbName))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.DROP_TABLE_LOCK)
-    val carbonEnv = CarbonEnv.getInstance(sparkSession)
-    val catalog = carbonEnv.carbonMetastore
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val absoluteTableIdentifier =
-      AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
-    catalog.checkSchemasModifiedTimeAndReloadTables()
+    val identifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
+    val dbName = identifier.getCarbonTableIdentifier.getDatabaseName
     val carbonLocks: scala.collection.mutable.ListBuffer[ICarbonLock] = ListBuffer()
     try {
       locksToBeAcquired foreach {
-        lock => carbonLocks += CarbonLockUtil.getLockObject(absoluteTableIdentifier, lock)
+        lock => carbonLocks += CarbonLockUtil.getLockObject(identifier, lock)
       }
       LOGGER.audit(s"Deleting table [$tableName] under database [$dbName]")
-      val carbonTable: Option[CarbonTable] =
-        catalog.getTableFromMetadataCache(dbName, tableName) match {
-          case Some(carbonTable) => Some(carbonTable)
-          case None => try {
-            Some(catalog.lookupRelation(identifier)(sparkSession)
-              .asInstanceOf[CarbonRelation].metaData.carbonTable)
-          } catch {
-            case ex: NoSuchTableException =>
-              if (!ifExistsSet) {
-                throw ex
-              }
-              None
-          }
-        }
-      if (carbonTable.isDefined) {
-        val relationIdentifiers = carbonTable.get.getTableInfo.getParentRelationIdentifiers
-        if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
-          if (!dropChildTable) {
-            if (!ifExistsSet) {
-              throw new Exception("Child table which is associated with datamap cannot " +
-                                  "be dropped, use DROP DATAMAP command to drop")
-            } else {
-              return Seq.empty
-            }
+      carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
+      val relationIdentifiers = carbonTable.getTableInfo.getParentRelationIdentifiers
+      if (relationIdentifiers != null && !relationIdentifiers.isEmpty) {
+        if (!dropChildTable) {
+          if (!ifExistsSet) {
+            throw new Exception("Child table which is associated with datamap cannot " +
+                                "be dropped, use DROP DATAMAP command to drop")
+          } else {
+            return Seq.empty
           }
         }
       }
@@ -97,8 +71,7 @@ case class CarbonDropTableCommand(
           ifExistsSet,
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePreEvent, operationContext)
-      CarbonEnv.getInstance(sparkSession).carbonMetastore
-        .dropTable(absoluteTableIdentifier)(sparkSession)
+      CarbonEnv.getInstance(sparkSession).carbonMetastore.dropTable(identifier)(sparkSession)
 
       // fires the event after dropping main table
       val dropTablePostEvent: DropTablePostEvent =
@@ -108,7 +81,12 @@ case class CarbonDropTableCommand(
           sparkSession)
       OperationListenerBus.getInstance.fireEvent(dropTablePostEvent, operationContext)
       LOGGER.audit(s"Deleted table [$tableName] under database [$dbName]")
+
     } catch {
+      case ex: NoSuchTableException =>
+        if (!ifExistsSet) {
+          throw ex
+        }
       case ex: Exception =>
         LOGGER.error(ex, s"Dropping table $dbName.$tableName failed")
         CarbonException.analysisException(
@@ -125,23 +103,16 @@ case class CarbonDropTableCommand(
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    // delete the table folder
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName, tableName)
     if (carbonTable != null) {
       // clear driver side index and dictionary cache
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
-    }
-    val metadataFilePath =
-      CarbonStorePath.getCarbonTablePath(tableIdentifier).getMetadataDirectoryPath
-    val fileType = FileFactory.getFileType(metadataFilePath)
-    if (FileFactory.isFileExist(metadataFilePath, fileType)) {
-      val file = FileFactory.getCarbonFile(metadataFilePath, fileType)
-      CarbonUtil.deleteFoldersAndFilesSilent(file.getParentFile)
+      // delete the table folder
+      val tablePath = carbonTable.getTablePath
+      val fileType = FileFactory.getFileType(tablePath)
+      if (FileFactory.isFileExist(tablePath, fileType)) {
+        val file = FileFactory.getCarbonFile(tablePath, fileType)
+        CarbonUtil.deleteFoldersAndFilesSilent(file)
+      }
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
index 8c2acdb..d08ef26 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/StreamingTableStrategy.scala
@@ -22,9 +22,8 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.{SparkPlan, SparkStrategy}
 import org.apache.spark.sql.execution.command.AlterTableRenameCommand
-import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand, DeleteExecution}
+import org.apache.spark.sql.execution.command.mutation.{CarbonProjectForDeleteCommand, CarbonProjectForUpdateCommand}
 import org.apache.spark.sql.execution.command.schema.{CarbonAlterTableAddColumnCommand, CarbonAlterTableDataTypeChangeCommand, CarbonAlterTableDropColumnCommand}
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
@@ -35,14 +34,14 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
 
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = {
     plan match {
-      case CarbonProjectForUpdateCommand(_, tableIdentifier) =>
+      case CarbonProjectForUpdateCommand(_, databaseNameOp, tableName) =>
         rejectIfStreamingTable(
-          DeleteExecution.getTableIdentifier(tableIdentifier),
+          TableIdentifier(tableName, databaseNameOp),
           "Data update")
         Nil
-      case CarbonProjectForDeleteCommand(_, tableIdentifier, _) =>
+      case CarbonProjectForDeleteCommand(_, databaseNameOp, tableName, timestamp) =>
         rejectIfStreamingTable(
-          DeleteExecution.getTableIdentifier(tableIdentifier),
+          TableIdentifier(tableName, databaseNameOp),
           "Date delete")
         Nil
       case CarbonAlterTableAddColumnCommand(model) =>
@@ -73,11 +72,9 @@ private[sql] class StreamingTableStrategy(sparkSession: SparkSession) extends Sp
    * Validate whether Update operation is allowed for specified table in the command
    */
   private def rejectIfStreamingTable(tableIdentifier: TableIdentifier, operation: String): Unit = {
-    val streaming = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
-      .isStreamingTable
+    val streaming =
+      CarbonEnv.getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
+        .isStreamingTable
     if (streaming) {
       throw new MalformedCarbonCommandException(
         s"$operation is not allowed for streaming table")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 846b64c..0b3a2b3 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -127,8 +127,6 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
     } else {
       updatedSelectPlan
     }
-    val tid = CarbonTableIdentifierImplicit.toTableIdentifier(Seq(table.tableIdentifier.toString()))
-    val tidSeq = Seq(GetDB.getDatabaseName(tid.database, sparkSession))
     val destinationTable =
       CarbonReflectionUtils.getUnresolvedRelation(
         table.tableIdentifier,
@@ -142,8 +140,6 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
   def processDeleteRecordsQuery(selectStmt: String,
       alias: Option[String],
       table: UnresolvedRelation): LogicalPlan = {
-    val tidSeq = Seq(GetDB.getDatabaseName(table.tableIdentifier.database, sparkSession),
-      table.tableIdentifier.table)
     var addedTupleId = false
     val parsePlan = parser.parsePlan(selectStmt)
 
@@ -160,7 +156,8 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
     }
     CarbonProjectForDeleteCommand(
       selectPlan,
-      tidSeq,
+      table.tableIdentifier.database,
+      table.tableIdentifier.table,
       System.currentTimeMillis().toString)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 3134712..5078259 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -215,13 +215,8 @@ class CarbonFileMetastore extends CarbonMetaStore {
       val tableUniqueName = CarbonTable.buildUniqueName(dbName, tableName)
       val tableInfo: TableInfo = CarbonUtil.readSchemaFile(tableMetadataFile)
       val schemaConverter = new ThriftWrapperSchemaConverterImpl
-      val wrapperTableInfo = schemaConverter
-        .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
-      val schemaFilePath = CarbonStorePath
-        .getCarbonTablePath(tablePath, carbonTableIdentifier).getSchemaFilePath
-      wrapperTableInfo.setTablePath(tablePath)
-      wrapperTableInfo
-        .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
+      val wrapperTableInfo =
+        schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
       val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
       metadata.carbonTables += carbonTable
@@ -233,38 +228,31 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   /**
    * This method will overwrite the existing schema and update it with the given details
-   *
-   * @param newTableIdentifier
-   * @param thriftTableInfo
-   * @param schemaEvolutionEntry
-   * @param tablePath
-   * @param sparkSession
    */
-  def updateTableSchemaForAlter(newTableIdentifier: CarbonTableIdentifier,
+  def updateTableSchemaForAlter(
+      newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       tablePath: String) (sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, oldTableIdentifier)
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, oldTableIdentifier)
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     if (schemaEvolutionEntry != null) {
       thriftTableInfo.fact_table.schema_evolution.schema_evolution_history.add(schemaEvolutionEntry)
     }
-    val oldCarbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val newAbsoluteTableIdentifier = new AbsoluteTableIdentifier(CarbonUtil
-      .getNewTablePath(oldCarbonTablePath, newTableIdentifier), newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        newAbsoluteTableIdentifier.getTablePath)
-    val identifier =
-      new CarbonTableIdentifier(newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        wrapperTableInfo.getFactTable.getTableId)
-    val path = createSchemaThriftFile(wrapperTableInfo,
+    val oldTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
+    val newTablePath = CarbonUtil.getNewTablePath(oldTablePath, newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
       thriftTableInfo,
-      identifier)
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
+    val newAbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      newTablePath,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      oldTableIdentifier.getTableId)
+    val path = createSchemaThriftFile(newAbsoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, newAbsoluteTableIdentifier)
     path
   }
@@ -280,40 +268,33 @@ class CarbonFileMetastore extends CarbonMetaStore {
       thriftTableInfo: org.apache.carbondata.format.TableInfo,
       absoluteTableIdentifier: AbsoluteTableIdentifier)(sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        carbonTableIdentifier.getDatabaseName,
-        carbonTableIdentifier.getTableName,
-        absoluteTableIdentifier.getTablePath)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      carbonTableIdentifier.getDatabaseName,
+      carbonTableIdentifier.getTableName,
+      absoluteTableIdentifier.getTablePath)
     val evolutionEntries = thriftTableInfo.fact_table.schema_evolution.schema_evolution_history
     evolutionEntries.remove(evolutionEntries.size() - 1)
-    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
-    val path = createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
   }
 
-  override def revertTableSchemaForPreAggCreationFailure(absoluteTableIdentifier:
-  AbsoluteTableIdentifier,
+  override def revertTableSchemaForPreAggCreationFailure(
+      absoluteTableIdentifier: AbsoluteTableIdentifier,
       thriftTableInfo: org.apache.carbondata.format.TableInfo)
     (sparkSession: SparkSession): String = {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
-        absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
-        absoluteTableIdentifier.getTablePath)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      absoluteTableIdentifier.getCarbonTableIdentifier.getDatabaseName,
+      absoluteTableIdentifier.getCarbonTableIdentifier.getTableName,
+      absoluteTableIdentifier.getTablePath)
     val childSchemaList = wrapperTableInfo.getDataMapSchemaList
     childSchemaList.remove(childSchemaList.size() - 1)
-    wrapperTableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
-    val path = createSchemaThriftFile(wrapperTableInfo,
-      thriftTableInfo,
-      absoluteTableIdentifier.getCarbonTableIdentifier)
+    val path = createSchemaThriftFile(absoluteTableIdentifier, thriftTableInfo)
     addTableCache(wrapperTableInfo, absoluteTableIdentifier)
     path
-
   }
 
   /**
@@ -326,26 +307,19 @@ class CarbonFileMetastore extends CarbonMetaStore {
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
     val dbName = tableInfo.getDatabaseName
     val tableName = tableInfo.getFactTable.getTableName
-    val thriftTableInfo = schemaConverter
-      .fromWrapperToExternalTableInfo(tableInfo, dbName, tableName)
+    val thriftTableInfo = schemaConverter.fromWrapperToExternalTableInfo(
+      tableInfo, dbName, tableName)
     val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-    tableInfo.setTablePath(identifier.getTablePath)
-    createSchemaThriftFile(tableInfo,
-      thriftTableInfo,
-      identifier.getCarbonTableIdentifier)
+    createSchemaThriftFile(identifier, thriftTableInfo)
     LOGGER.info(s"Table $tableName for Database $dbName created successfully.")
   }
 
   /**
    * Generates schema string from TableInfo
    */
-  override def generateTableSchemaString(tableInfo: schema.table.TableInfo,
+  override def generateTableSchemaString(
+      tableInfo: schema.table.TableInfo,
       absoluteTableIdentifier: AbsoluteTableIdentifier): String = {
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
-    tableInfo.setTablePath(absoluteTableIdentifier.getTablePath)
     val schemaEvolutionEntry = new schema.SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
     tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
@@ -357,19 +331,13 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   /**
    * This method will write the schema thrift file in carbon store and load table metadata
-   *
-   * @param tableInfo
-   * @param thriftTableInfo
-   * @return
    */
-  private def createSchemaThriftFile(tableInfo: schema.table.TableInfo,
-      thriftTableInfo: TableInfo,
-      carbonTableIdentifier: CarbonTableIdentifier): String = {
-    val carbonTablePath = CarbonStorePath.
-      getCarbonTablePath(tableInfo.getTablePath, carbonTableIdentifier)
+  private def createSchemaThriftFile(
+      identifier: AbsoluteTableIdentifier,
+      thriftTableInfo: TableInfo): String = {
+    val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier)
     val schemaFilePath = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
     val fileType = FileFactory.getFileType(schemaMetadataPath)
     if (!FileFactory.isFileExist(schemaMetadataPath, fileType)) {
       FileFactory.mkdirs(schemaMetadataPath, fileType)
@@ -382,8 +350,9 @@ class CarbonFileMetastore extends CarbonMetaStore {
     carbonTablePath.getPath
   }
 
-  protected def addTableCache(tableInfo: table.TableInfo,
-      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+  protected def addTableCache(
+      tableInfo: table.TableInfo,
+      absoluteTableIdentifier: AbsoluteTableIdentifier): ArrayBuffer[CarbonTable] = {
     val identifier = absoluteTableIdentifier.getCarbonTableIdentifier
     CarbonMetadata.getInstance.removeTable(tableInfo.getTableUniqueName)
     removeTableFromMetadata(identifier.getDatabaseName, identifier.getTableName)
@@ -427,15 +396,11 @@ class CarbonFileMetastore extends CarbonMetaStore {
 
   def updateMetadataByThriftTable(schemaFilePath: String,
       tableInfo: TableInfo, dbName: String, tableName: String, tablePath: String): Unit = {
-
     tableInfo.getFact_table.getSchema_evolution.getSchema_evolution_history.get(0)
       .setTime_stamp(System.currentTimeMillis())
     val schemaConverter = new ThriftWrapperSchemaConverterImpl
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
-    wrapperTableInfo
-      .setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath))
-    wrapperTableInfo.setTablePath(tablePath)
+    val wrapperTableInfo =
+      schemaConverter.fromExternalToWrapperTableInfo(tableInfo, dbName, tableName, tablePath)
     updateMetadataByWrapperTable(wrapperTableInfo)
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index f98a53a..a41c51e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -156,20 +156,16 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
   private def updateHiveMetaStoreForAlter(newTableIdentifier: CarbonTableIdentifier,
       oldTableIdentifier: CarbonTableIdentifier,
       thriftTableInfo: format.TableInfo,
-      carbonStorePath: String,
+      oldTablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val tablePath = CarbonUtil.getNewTablePath(new Path(carbonStorePath), newTableIdentifier)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        carbonTablePath.toString)
-    wrapperTableInfo.setTablePath(carbonStorePath)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val newTablePath =
+      CarbonUtil.getNewTablePath(new Path(oldTablePath), newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
     val schemaParts = CarbonUtil.convertToMultiGsonStrings(wrapperTableInfo, "=", "'", "")
@@ -180,7 +176,7 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    CarbonStorePath.getCarbonTablePath(carbonStorePath, newTableIdentifier).getPath
+    CarbonStorePath.getCarbonTablePath(oldTablePath, newTableIdentifier).getPath
   }
 
   private def updateHiveMetaStoreForDataMap(newTableIdentifier: CarbonTableIdentifier,
@@ -189,23 +185,19 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       tablePath: String,
       sparkSession: SparkSession,
       schemaConverter: ThriftWrapperSchemaConverterImpl) = {
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier)
-    val wrapperTableInfo = schemaConverter
-      .fromExternalToWrapperTableInfo(thriftTableInfo,
-        newTableIdentifier.getDatabaseName,
-        newTableIdentifier.getTableName,
-        newTablePath)
-    wrapperTableInfo.setTablePath(newTablePath)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(newTablePath, newTableIdentifier)
-    val schemaMetadataPath =
-      CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-    wrapperTableInfo.setMetaDataFilepath(schemaMetadataPath)
+    val newTablePath =
+      CarbonUtil.getNewTablePath(new Path(tablePath), newTableIdentifier.getTableName)
+    val wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+      thriftTableInfo,
+      newTableIdentifier.getDatabaseName,
+      newTableIdentifier.getTableName,
+      newTablePath)
     val dbName = oldTableIdentifier.getDatabaseName
     val tableName = oldTableIdentifier.getTableName
     sparkSession.catalog.refreshTable(TableIdentifier(tableName, Some(dbName)).quotedString)
     removeTableFromMetadata(dbName, tableName)
     CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo)
-    carbonTablePath.getPath
+    newTablePath
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
index cf671cb..b358f83 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/execution/command/CarbonHiveCommands.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.hive.execution.command
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchDatabaseException
 import org.apache.spark.sql.catalyst.expressions.Attribute
@@ -26,7 +26,6 @@ import org.apache.spark.sql.execution.command.table.CarbonDropTableCommand
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, SessionParams}
-import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 
 case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
@@ -42,8 +41,7 @@ case class CarbonDropDatabaseCommand(command: DropDatabaseCommand)
     }
     var databaseLocation = ""
     try {
-      databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-        CarbonProperties.getStorePath)
+      databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
     } catch {
       case e: NoSuchDatabaseException =>
         // ignore the exception as exception will be handled by hive command.run

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
index e3f0d21..2e39f5e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonLateDecodeRule.scala
@@ -125,21 +125,18 @@ class CarbonLateDecodeRule extends Rule[LogicalPlan] with PredicateHelper {
       case ProjectForUpdate(table, cols, Seq(updatePlan)) =>
         var isTransformed = false
         val newPlan = updatePlan transform {
-          case Project(pList, child) if (!isTransformed) =>
+          case Project(pList, child) if !isTransformed =>
             val (dest: Seq[NamedExpression], source: Seq[NamedExpression]) = pList
               .splitAt(pList.size - cols.size)
             val diff = cols.diff(dest.map(_.name.toLowerCase))
-            if (diff.size > 0) {
+            if (diff.nonEmpty) {
               sys.error(s"Unknown column(s) ${diff.mkString(",")} in table ${table.tableName}")
             }
             isTransformed = true
             Project(dest.filter(a => !cols.contains(a.name.toLowerCase)) ++ source, child)
         }
-        val identifier = table.tableIdentifier.database match {
-          case Some(db) => Seq(db, table.tableIdentifier.table)
-          case _ => Seq(table.tableIdentifier.table)
-        }
-        CarbonProjectForUpdateCommand(newPlan, identifier)
+        CarbonProjectForUpdateCommand(
+          newPlan, table.tableIdentifier.database, table.tableIdentifier.table)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 46336ac..6f7b89a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -494,9 +494,10 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
           None,
           true)
 
-        val alterTableAddColumnsModel = AlterTableAddColumnsModel(convertDbNameToLowerCase(dbName),
+        val alterTableAddColumnsModel = AlterTableAddColumnsModel(
+          convertDbNameToLowerCase(dbName),
           table,
-          tableProps,
+          tableProps.toMap,
           tableModel.dimCols,
           tableModel.msrCols,
           tableModel.highcardinalitydims.getOrElse(Seq.empty))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
index 07dc672..26529f9 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/parser/CarbonSparkSqlParser.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.parser
 import scala.collection.mutable
 
 import org.antlr.v4.runtime.tree.TerminalNode
-import org.apache.spark.sql.{CarbonSession, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, CarbonSession, SparkSession}
 import org.apache.spark.sql.catalyst.parser.{AbstractSqlParser, ParseException, SqlBaseParser}
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
 import org.apache.spark.sql.catalyst.parser.SqlBaseParser._
@@ -145,11 +145,12 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       partitionColumns: ColTypeListContext,
       columns : ColTypeListContext,
       tablePropertyList : TablePropertyListContext,
+      locationSpecContext: SqlBaseParser.LocationSpecContext,
       tableComment : Option[String],
       ctas: TerminalNode) : LogicalPlan = {
     // val parser = new CarbonSpark2SqlParser
 
-    val (name, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
+    val (tableIdentifier, temp, ifNotExists, external) = visitCreateTableHeader(tableHeader)
     // TODO: implement temporary tables
     if (temp) {
       throw new ParseException(
@@ -178,8 +179,14 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       val duplicateColumns = colNames.groupBy(identity).collect {
         case (x, ys) if ys.length > 1 => "\"" + x + "\""
       }
-      operationNotAllowed(s"Duplicated column names found in table definition of $name: " +
-                          duplicateColumns.mkString("[", ",", "]"), columns)
+      operationNotAllowed(s"Duplicated column names found in table definition of " +
+                          s"$tableIdentifier: ${duplicateColumns.mkString("[", ",", "]")}", columns)
+    }
+
+    val tablePath = if (locationSpecContext != null) {
+      Some(visitLocationSpec(locationSpecContext))
+    } else {
+      None
     }
 
     val tableProperties = mutable.Map[String, String]()
@@ -211,9 +218,10 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
     validateStreamingProperty(options)
 
     // prepare table model of the collected tokens
-    val tableModel: TableModel = parser.prepareTableModel(ifNotExists,
-      convertDbNameToLowerCase(name.database),
-      name.table.toLowerCase,
+    val tableModel: TableModel = parser.prepareTableModel(
+      ifNotExists,
+      convertDbNameToLowerCase(tableIdentifier.database),
+      tableIdentifier.table.toLowerCase,
       fields,
       partitionFields,
       tableProperties,
@@ -221,7 +229,7 @@ class CarbonHelperSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser)
       isAlterFlow = false,
       tableComment)
 
-    CarbonCreateTableCommand(tableModel)
+    CarbonCreateTableCommand(tableModel, tablePath)
   }
 
   private def validateStreamingProperty(carbonOption: CarbonOption): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index cba315b..58b3362 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -194,11 +194,10 @@ object AlterTableUtil {
       oldTableIdentifier.table, tableId)
     val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, oldCarbonTableIdentifier)
     val newCarbonTableIdentifier = new CarbonTableIdentifier(database, newTableName, tableId)
-    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newCarbonTableIdentifier)
+    val newTablePath = CarbonUtil.getNewTablePath(new Path(tablePath), newTableName)
     val metastore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-    val tableMetadataFile = carbonTablePath.getPath
-    val fileType = FileFactory.getFileType(tableMetadataFile)
-    if (FileFactory.isFileExist(tableMetadataFile, fileType)) {
+    val fileType = FileFactory.getFileType(tablePath)
+    if (FileFactory.isFileExist(tablePath, fileType)) {
       val tableInfo = if (metastore.isReadFromHiveMetaStore) {
         // In case of hive metastore we first update the carbonschema inside old table only.
         metastore.getThriftTableInfo(CarbonStorePath.getCarbonTablePath(tablePath,
@@ -213,7 +212,8 @@ object AlterTableUtil {
         FileFactory.getCarbonFile(carbonTablePath.getPath, fileType)
           .renameForce(carbonTablePath.getParent.toString + CarbonCommonConstants.FILE_SEPARATOR +
                        oldTableIdentifier.table)
-        val absoluteTableIdentifier = new AbsoluteTableIdentifier(newTablePath,
+        val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+          newTablePath,
           newCarbonTableIdentifier)
         metastore.revertTableSchemaInAlterFailure(oldCarbonTableIdentifier,
           tableInfo, absoluteTableIdentifier)(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index b6df33e..9f66737 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -251,6 +251,7 @@ class CarbonSqlAstBuilder(conf: SQLConf, parser: CarbonSpark2SqlParser) extends
           ctx.partitionColumns,
           ctx.columns,
           ctx.tablePropertyList,
+          ctx.locationSpec,
           Option(ctx.STRING()).map(string),
           ctx.AS)
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
index 4cb9c6d..56c5747 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/AllDictionaryTestCase.scala
@@ -45,6 +45,7 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     val carbonLoadModel = new CarbonLoadModel
     carbonLoadModel.setTableName(relation.carbonTable.getDatabaseName)
     carbonLoadModel.setDatabaseName(relation.carbonTable.getTableName)
+    carbonLoadModel.setTablePath(relation.metaData.carbonTable.getTablePath)
     val table = relation.carbonTable
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
@@ -145,7 +146,6 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      sampleRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
 
     DictionaryTestCaseUtil.
@@ -158,7 +158,6 @@ class AllDictionaryTestCase extends Spark2QueryTest with BeforeAndAfterAll {
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      complexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
 
     DictionaryTestCaseUtil.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
index 4551120..8467f8d 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/DictionaryTestCaseUtil.scala
@@ -41,7 +41,7 @@ object DictionaryTestCaseUtil {
     val table = relation.carbonTable
     val dimension = table.getDimensionByName(table.getTableName, columnName)
     val tableIdentifier = new CarbonTableIdentifier(table.getDatabaseName, table.getTableName, "uniqueid")
-    val  absoluteTableIdentifier = new AbsoluteTableIdentifier(table.getTablePath, tableIdentifier)
+    val  absoluteTableIdentifier = AbsoluteTableIdentifier.from(table.getTablePath, tableIdentifier)
     val columnIdentifier = new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier,
       dimension.getColumnIdentifier, dimension.getDataType,
       CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
index da5469a..cad7807 100644
--- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/util/ExternalColumnDictionaryTestCase.scala
@@ -145,7 +145,8 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
       .asInstanceOf[CarbonRelation]
   }
 
-  def buildCarbonLoadModel(relation: CarbonRelation,
+  def buildCarbonLoadModel(
+      relation: CarbonRelation,
       filePath: String,
       header: String,
       extColFilePath: String,
@@ -157,6 +158,7 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     val carbonSchema = new CarbonDataLoadSchema(table)
     carbonLoadModel.setDatabaseName(table.getDatabaseName)
     carbonLoadModel.setTableName(table.getTableName)
+    carbonLoadModel.setTablePath(relation.carbonTable.getTablePath)
     carbonLoadModel.setCarbonDataLoadSchema(carbonSchema)
     carbonLoadModel.setFactFilePath(filePath)
     carbonLoadModel.setCsvHeader(header)
@@ -201,7 +203,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -213,7 +214,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check the old dictionary and whether the new distinct value is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -229,7 +229,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      extComplexRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(
@@ -241,7 +240,6 @@ class ExternalColumnDictionaryTestCase extends Spark2QueryTest with BeforeAndAft
     GlobalDictionaryUtil.generateGlobalDictionary(
       sqlContext,
       carbonLoadModel,
-      verticalDelimiteRelation.carbonTable.getTablePath,
       FileFactory.getConfiguration)
     // check whether the dictionary is generated
     DictionaryTestCaseUtil.checkDictionary(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
index cec7bbc..48519fd 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/DataLoadFailAllTypeSortTest.scala
@@ -219,12 +219,10 @@ class DataLoadFailAllTypeSortTest extends Spark2QueryTest with BeforeAndAfterAll
       CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.CARBON_BAD_RECORDS_ACTION, "FAIL")
       sql("create table data_tbm(name String, dob long, weight int) " +
-          "USING org.apache.spark.sql.CarbonSource OPTIONS('bucketnumber'='4', " +
+          "stored by 'carbondata' tblproperties('bucketnumber'='4', " +
           "'bucketcolumns'='name', 'tableName'='data_tbm')")
       val testData = s"$resourcesPath/badrecords/dummy.csv"
       sql(s"""LOAD DATA LOCAL INPATH '$testData' INTO table data_tbm""")
-
-
     } catch {
       case x: Throwable => {
         assert(x.getMessage.contains("Data load failed due to bad record"))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
index 778e1b3..4b7ce63 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java
@@ -108,8 +108,9 @@ public class FieldEncoderFactory {
                   dataField.getColumn().getDataType());
           CarbonTablePath carbonTablePath =
               CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-          AbsoluteTableIdentifier parentAbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier),
+          AbsoluteTableIdentifier parentAbsoluteTableIdentifier =
+              AbsoluteTableIdentifier.from(
+              CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()),
               parentTableIdentifier);
           identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier,
               parentColumnIdentifier, dataField.getColumn().getDataType(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index db3442e..cda31c0 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -25,17 +25,7 @@ import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.nio.charset.Charset;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -254,12 +244,10 @@ public final class CarbonLoaderUtil {
    * @return boolean which determines whether status update is done or not.
    * @throws IOException
    */
-  public static boolean recordLoadMetadata(LoadMetadataDetails newMetaEntry,
+  public static boolean recordNewLoadMetadata(LoadMetadataDetails newMetaEntry,
       CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite)
       throws IOException, InterruptedException {
     boolean status = false;
-    String metaDataFilepath =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
     AbsoluteTableIdentifier absoluteTableIdentifier =
         loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
@@ -277,7 +265,7 @@ public final class CarbonLoaderUtil {
             "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName()
                 + " for table status updation");
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+            SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath());
         List<LoadMetadataDetails> listOfLoadFolderDetails =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
         List<CarbonFile> staleFolders = new ArrayList<>();
@@ -857,18 +845,13 @@ public final class CarbonLoaderUtil {
 
   /**
    * This method will get the store location for the given path, segment id and partition id
-   *
-   * @param carbonStorePath
-   * @param segmentId
    */
-  public static void checkAndCreateCarbonDataLocation(String carbonStorePath,
-      String segmentId, CarbonTable carbonTable) {
+  public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) {
     CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
     CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+        CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier);
+    String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+    CarbonUtil.checkAndCreateFolder(segmentFolder);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
index 485e718..7925b35 100644
--- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
+++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java
@@ -18,34 +18,26 @@
 package org.apache.carbondata.carbon.datastore;
 
 import java.io.File;
-import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.BlockIndexStore;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.block.TableBlockUniqueIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.processing.StoreCreator;
 
 import junit.framework.TestCase;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 public class BlockIndexStoreTest extends TestCase {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
index 8c2f7bb..78c99d6 100644
--- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java
@@ -43,7 +43,6 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
 import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl;
 import org.apache.carbondata.core.fileoperations.FileWriteOperation;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
@@ -53,6 +52,7 @@ import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolution;
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.metadata.schema.table.TableInfo;
 import org.apache.carbondata.core.metadata.schema.table.TableSchema;
@@ -73,15 +73,15 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.processing.util.TableOptionConstant;
+import org.apache.carbondata.processing.loading.DataLoadExecutor;
+import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
 import org.apache.carbondata.processing.loading.csvinput.BlockDetails;
 import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat;
 import org.apache.carbondata.processing.loading.csvinput.CSVRecordReaderIterator;
 import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable;
 import org.apache.carbondata.processing.loading.model.CarbonDataLoadSchema;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
-import org.apache.carbondata.processing.loading.DataLoadExecutor;
-import org.apache.carbondata.processing.loading.constants.DataLoadProcessorConstants;
+import org.apache.carbondata.processing.util.TableOptionConstant;
 
 import com.google.gson.Gson;
 import org.apache.hadoop.conf.Configuration;
@@ -106,7 +106,8 @@ public class StoreCreator {
       String dbName = "testdb";
       String tableName = "testtable";
       absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(storePath + "/testdb/testtable",
+          AbsoluteTableIdentifier.from(
+              storePath + "/testdb/testtable",
               new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
     } catch (IOException ex) {
 
@@ -268,7 +269,6 @@ public class StoreCreator {
             absoluteTableIdentifier.getCarbonTableIdentifier());
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    tableInfo.setMetaDataFilepath(schemaMetadataPath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index f268883..8c4d5ba 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -293,7 +293,7 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
-      CarbonLoaderUtil.recordLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+      CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sqlContext.sparkContext,


[3/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support when creating table

Posted by ra...@apache.org.
[CARBONDATA-1844] Add tablePath support when creating table

User should be able to specify table path when creating table. This PR supports it.
The tablePath of the table is determined as following steps:

Use the table property 'table_path' specified by user
Use database 's locationUri if it is specified by user in create database command, and concatenate with table name as table path
Use carbon store path and database name and table name to construct a table path

This closes #1603


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/2fe7758b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/2fe7758b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/2fe7758b

Branch: refs/heads/master
Commit: 2fe7758be1278548f9f321b1b0fc0305488a46cd
Parents: 25c2824
Author: Jacky Li <ja...@qq.com>
Authored: Sun Dec 3 13:42:20 2017 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Dec 4 12:05:54 2017 +0530

----------------------------------------------------------------------
 .../core/metadata/AbsoluteTableIdentifier.java  |  25 +++-
 .../core/metadata/CarbonMetadata.java           |   4 +-
 .../metadata/converter/SchemaConverter.java     |   7 +-
 .../ThriftWrapperSchemaConverterImpl.java       |   7 +-
 .../core/metadata/schema/table/CarbonTable.java |  36 +----
 .../core/metadata/schema/table/TableInfo.java   |  41 +-----
 .../core/scan/executor/util/QueryUtil.java      |  23 ++-
 .../core/service/ColumnUniqueIdService.java     |   3 +-
 .../service/impl/ColumnUniqueIdGenerator.java   |   2 +-
 .../apache/carbondata/core/util/CarbonUtil.java |  11 +-
 .../core/util/path/CarbonTablePath.java         | 125 +---------------
 .../DictionaryCacheLoaderImplTest.java          |   2 +-
 .../DictionaryColumnUniqueIdentifierTest.java   |   5 +-
 .../dictionary/ForwardDictionaryCacheTest.java  |   5 +-
 .../dictionary/ReverseDictionaryCacheTest.java  |   5 +-
 .../carbon/AbsoluteTableIdentifierTest.java     |  10 +-
 .../datastore/SegmentTaskIndexStoreTest.java    |  17 +--
 .../reader/CarbonDictionaryReaderImplTest.java  |   6 +-
 ...CarbonDictionarySortIndexReaderImplTest.java |   7 +-
 .../core/scan/filter/FilterUtilTest.java        |  13 +-
 .../CarbonFormatDirectoryStructureTest.java     |   2 +-
 .../writer/CarbonDictionaryWriterImplTest.java  |  14 +-
 ...CarbonDictionarySortIndexWriterImplTest.java |   2 +-
 .../carbondata/hadoop/util/SchemaReader.java    |  10 +-
 .../streaming/CarbonStreamInputFormatTest.java  |  11 +-
 .../streaming/CarbonStreamOutputFormatTest.java |  14 +-
 .../hadoop/test/util/StoreCreator.java          |  14 +-
 .../presto/impl/CarbonTableReader.java          |   4 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   3 +-
 .../DataCompactionCardinalityBoundryTest.scala  |   5 +-
 .../datacompaction/DataCompactionLockTest.scala |   5 +-
 .../testsuite/dataload/TestLoadDataFrame.scala  |  23 ++-
 ...estLoadDataWithHiveSyntaxDefaultFormat.scala |  44 +++++-
 .../iud/DeleteCarbonTableTestCase.scala         |  12 +-
 .../iud/UpdateCarbonTableTestCase.scala         |  22 +--
 .../org/apache/carbondata/api/CarbonStore.scala |   4 +-
 .../carbondata/events/CreateTableEvents.scala   |  12 +-
 .../carbondata/events/DropTableEvents.scala     |   9 +-
 .../org/apache/carbondata/events/Events.scala   |   6 +-
 .../apache/carbondata/spark/CarbonOption.scala  |   8 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |   2 -
 .../spark/rdd/CarbonIUDMergerRDD.scala          |   4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |   8 +-
 .../carbondata/spark/rdd/CarbonScanRDD.scala    |   8 +-
 .../spark/tasks/DictionaryWriterTask.scala      |   2 +-
 .../carbondata/spark/util/CommonUtil.scala      |   6 +-
 .../spark/util/GlobalDictionaryUtil.scala       |  47 +++---
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala |   5 +-
 .../command/carbonTableSchemaCommon.scala       |  32 ++--
 .../spark/util/CarbonReflectionUtils.scala      |   3 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  10 +-
 .../spark/sql/CarbonCatalystOperators.scala     |  42 ------
 .../spark/sql/CarbonDataFrameWriter.scala       |  19 ++-
 .../sql/CarbonDatasourceHadoopRelation.scala    |   7 +-
 .../spark/sql/CarbonDictionaryDecoder.scala     |   6 +-
 .../scala/org/apache/spark/sql/CarbonEnv.scala  |  96 +++++++-----
 .../org/apache/spark/sql/CarbonSource.scala     | 146 +++++++++----------
 .../datamap/CarbonCreateDataMapCommand.scala    |   2 +-
 .../datamap/CarbonDropDataMapCommand.scala      |  14 +-
 .../command/datamap/DataMapListeners.scala      |   4 +-
 .../management/CarbonCleanFilesCommand.scala    |  19 +--
 .../CarbonDeleteLoadByIdCommand.scala           |   4 +-
 .../CarbonDeleteLoadByLoadDateCommand.scala     |   4 +-
 .../management/CarbonLoadDataCommand.scala      |  29 ++--
 .../management/CarbonShowLoadsCommand.scala     |   4 +-
 .../CarbonProjectForDeleteCommand.scala         |  23 +--
 .../CarbonProjectForUpdateCommand.scala         |  56 ++++---
 .../command/mutation/DeleteExecution.scala      |  23 +--
 .../command/mutation/HorizontalCompaction.scala |   3 +-
 .../CreatePreAggregateTableCommand.scala        |  11 +-
 .../schema/CarbonAlterTableRenameCommand.scala  |  13 +-
 .../table/CarbonCreateTableCommand.scala        | 116 +++++++--------
 .../command/table/CarbonDropTableCommand.scala  |  87 ++++-------
 .../strategy/StreamingTableStrategy.scala       |  19 +--
 .../spark/sql/hive/CarbonAnalysisRules.scala    |   7 +-
 .../spark/sql/hive/CarbonFileMetastore.scala    | 125 ++++++----------
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  42 +++---
 .../execution/command/CarbonHiveCommands.scala  |   6 +-
 .../sql/optimizer/CarbonLateDecodeRule.scala    |  11 +-
 .../sql/parser/CarbonSpark2SqlParser.scala      |   5 +-
 .../spark/sql/parser/CarbonSparkSqlParser.scala |  24 ++-
 .../org/apache/spark/util/AlterTableUtil.scala  |  10 +-
 .../src/main/spark2.1/CarbonSessionState.scala  |   1 +
 .../spark/util/AllDictionaryTestCase.scala      |   3 +-
 .../spark/util/DictionaryTestCaseUtil.scala     |   2 +-
 .../util/ExternalColumnDictionaryTestCase.scala |   8 +-
 .../DataLoadFailAllTypeSortTest.scala           |   4 +-
 .../converter/impl/FieldEncoderFactory.java     |   5 +-
 .../processing/util/CarbonLoaderUtil.java       |  31 +---
 .../carbon/datastore/BlockIndexStoreTest.java   |  12 +-
 .../carbondata/processing/StoreCreator.java     |  12 +-
 .../carbondata/streaming/StreamHandoffRDD.scala |   2 +-
 92 files changed, 767 insertions(+), 995 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index 603a1c1..6ef2671 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -44,7 +44,7 @@ public class AbsoluteTableIdentifier implements Serializable {
    */
   private CarbonTableIdentifier carbonTableIdentifier;
 
-  public AbsoluteTableIdentifier(String tablePath, CarbonTableIdentifier carbonTableIdentifier) {
+  private AbsoluteTableIdentifier(String tablePath, CarbonTableIdentifier carbonTableIdentifier) {
     //TODO this should be moved to common place where path handling will be handled
     this.tablePath = FileFactory.getUpdatedFilePath(tablePath);
     isLocalPath = tablePath.startsWith(CarbonCommonConstants.LOCAL_FILE_PREFIX);
@@ -58,11 +58,22 @@ public class AbsoluteTableIdentifier implements Serializable {
     return carbonTableIdentifier;
   }
 
-  public static AbsoluteTableIdentifier from(String tablePath, String dbName, String tableName) {
-    CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, "");
+  public static AbsoluteTableIdentifier from(String tablePath, String dbName, String tableName,
+      String tableId) {
+    CarbonTableIdentifier identifier = new CarbonTableIdentifier(dbName, tableName, tableId);
     return new AbsoluteTableIdentifier(tablePath, identifier);
   }
 
+  public static AbsoluteTableIdentifier from(String tablePath, String dbName, String tableName) {
+    return from(tablePath, dbName, tableName, "");
+  }
+
+  public static AbsoluteTableIdentifier from(
+      String tablePath,
+      CarbonTableIdentifier carbonTableIdentifier) {
+    return new AbsoluteTableIdentifier(tablePath, carbonTableIdentifier);
+  }
+
   public String getTablePath() {
     return tablePath;
   }
@@ -124,4 +135,12 @@ public class AbsoluteTableIdentifier implements Serializable {
   public String uniqueName() {
     return tablePath + "/" + carbonTableIdentifier.toString().toLowerCase();
   }
+
+  public String getDatabaseName() {
+    return carbonTableIdentifier.getDatabaseName();
+  }
+
+  public String getTableName() {
+    return carbonTableIdentifier.getTableName();
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index e2ce43a..3f8c12d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -86,8 +86,8 @@ public final class CarbonMetadata {
    */
   public void loadTableMetadata(TableInfo tableInfo) {
     CarbonTable carbonTable = tableInfoMap.get(convertToLowerCase(tableInfo.getTableUniqueName()));
-    if (null == carbonTable || carbonTable.getTableLastUpdatedTime() < tableInfo
-        .getLastUpdatedTime()) {
+    if (null == carbonTable ||
+        carbonTable.getTableLastUpdatedTime() < tableInfo.getLastUpdatedTime()) {
       carbonTable = CarbonTable.buildFromTableInfo(tableInfo);
       tableInfoMap.put(convertToLowerCase(tableInfo.getTableUniqueName()), carbonTable);
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
index af86253..4f34070 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/SchemaConverter.java
@@ -99,8 +99,11 @@ public interface SchemaConverter {
    * @param tableName
    * @return
    */
-  TableInfo fromExternalToWrapperTableInfo(org.apache.carbondata.format.TableInfo externalTableInfo,
-      String dbName, String tableName, String storePath);
+  TableInfo fromExternalToWrapperTableInfo(
+      org.apache.carbondata.format.TableInfo externalTableInfo,
+      String dbName,
+      String tableName,
+      String storePath);
 
   /**
    * method to convert thrift datamap schema object to wrapper

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
index 408c861..8a24e38 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/converter/ThriftWrapperSchemaConverterImpl.java
@@ -608,14 +608,15 @@ public class ThriftWrapperSchemaConverterImpl implements SchemaConverter {
    * convert from external to wrapper tableinfo
    */
   @Override public TableInfo fromExternalToWrapperTableInfo(
-      org.apache.carbondata.format.TableInfo externalTableInfo, String dbName, String tableName,
+      org.apache.carbondata.format.TableInfo externalTableInfo,
+      String dbName,
+      String tableName,
       String tablePath) {
     TableInfo wrapperTableInfo = new TableInfo();
     List<org.apache.carbondata.format.SchemaEvolutionEntry> schemaEvolutionList =
         externalTableInfo.getFact_table().getSchema_evolution().getSchema_evolution_history();
     wrapperTableInfo.setLastUpdatedTime(
-        schemaEvolutionList.get(schemaEvolutionList.size() - 1)
-            .getTime_stamp());
+        schemaEvolutionList.get(schemaEvolutionList.size() - 1).getTime_stamp());
     wrapperTableInfo.setDatabaseName(dbName);
     wrapperTableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName));
     wrapperTableInfo.setTablePath(tablePath);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
index 66b747b..e5d8839 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/CarbonTable.java
@@ -33,6 +33,7 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDim
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
 
 /**
  * Mapping class for Carbon actual table
@@ -50,11 +51,6 @@ public class CarbonTable implements Serializable {
   private static final long serialVersionUID = 8696507171227156445L;
 
   /**
-   * Absolute table identifier
-   */
-  private AbsoluteTableIdentifier absoluteTableIdentifier;
-
-  /**
    * TableName, Dimensions list. This map will contain allDimensions which are visible
    */
   private Map<String, List<CarbonDimension>> tableDimensionsMap;
@@ -102,11 +98,6 @@ public class CarbonTable implements Serializable {
   private String tableUniqueName;
 
   /**
-   * metadata file path (check if it is really required )
-   */
-  private String metaDataFilepath;
-
-  /**
    * last updated time
    */
   private long tableLastUpdatedTime;
@@ -149,9 +140,6 @@ public class CarbonTable implements Serializable {
     table.blockSize = tableInfo.getTableBlockSizeInMB();
     table.tableLastUpdatedTime = tableInfo.getLastUpdatedTime();
     table.tableUniqueName = tableInfo.getTableUniqueName();
-    table.metaDataFilepath = tableInfo.getMetaDataFilepath();
-    table.absoluteTableIdentifier = tableInfo.getOrCreateAbsoluteTableIdentifier();
-
     table.fillDimensionsAndMeasuresForTables(tableInfo.getFactTable());
     table.fillCreateOrderColumn(tableInfo.getFactTable().getTableName());
     if (tableInfo.getFactTable().getBucketingInfo() != null) {
@@ -343,14 +331,14 @@ public class CarbonTable implements Serializable {
    * @return the databaseName
    */
   public String getDatabaseName() {
-    return absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName();
+    return tableInfo.getDatabaseName();
   }
 
   /**
    * @return the tabelName
    */
   public String getTableName() {
-    return absoluteTableIdentifier.getCarbonTableIdentifier().getTableName();
+    return tableInfo.getFactTable().getTableName();
   }
 
   /**
@@ -375,14 +363,14 @@ public class CarbonTable implements Serializable {
    * @return the metaDataFilepath
    */
   public String getMetaDataFilepath() {
-    return metaDataFilepath;
+    return CarbonTablePath.getMetadataPath(getTablePath());
   }
 
   /**
    * @return storepath
    */
   public String getTablePath() {
-    return absoluteTableIdentifier.getTablePath();
+    return tableInfo.getOrCreateAbsoluteTableIdentifier().getTablePath();
   }
 
   /**
@@ -433,16 +421,6 @@ public class CarbonTable implements Serializable {
   }
 
   /**
-   * to get the all dimension of a table
-   *
-   * @param tableName
-   * @return all dimension of a table
-   */
-  public List<CarbonDimension> getImplicitDimensionByTableName(String tableName) {
-    return tableImplicitDimensionsMap.get(tableName);
-  }
-
-  /**
    * This will give user created order column
    *
    * @return
@@ -589,14 +567,14 @@ public class CarbonTable implements Serializable {
    * @return absolute table identifier
    */
   public AbsoluteTableIdentifier getAbsoluteTableIdentifier() {
-    return absoluteTableIdentifier;
+    return tableInfo.getOrCreateAbsoluteTableIdentifier();
   }
 
   /**
    * @return carbon table identifier
    */
   public CarbonTableIdentifier getCarbonTableIdentifier() {
-    return absoluteTableIdentifier.getCarbonTableIdentifier();
+    return tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier();
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
index 40ce92d..4deafd4 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/schema/table/TableInfo.java
@@ -69,16 +69,11 @@ public class TableInfo implements Serializable, Writable {
   private long lastUpdatedTime;
 
   /**
-   * metadata file path (check if it is really required )
-   */
-  private String metaDataFilepath;
-
-  /**
-   * store location
+   * store location of the table, it will be set in identifier.tablePath also
    */
   private String tablePath;
 
-  // this idenifier is a lazy field which will be created when it is used first time
+  // this identifier is a lazy field which will be created when it is used first time
   private AbsoluteTableIdentifier identifier;
 
   private List<DataMapSchema> dataMapSchemaList;
@@ -162,24 +157,6 @@ public class TableInfo implements Serializable, Writable {
     this.lastUpdatedTime = lastUpdatedTime;
   }
 
-  /**
-   * @return
-   */
-  public String getMetaDataFilepath() {
-    return metaDataFilepath;
-  }
-
-  /**
-   * @param metaDataFilepath
-   */
-  public void setMetaDataFilepath(String metaDataFilepath) {
-    this.metaDataFilepath = metaDataFilepath;
-  }
-
-  public String getTablePath() {
-    return tablePath;
-  }
-
   public void setTablePath(String tablePath) {
     this.tablePath = tablePath;
   }
@@ -258,8 +235,7 @@ public class TableInfo implements Serializable, Writable {
     out.writeUTF(tableUniqueName);
     factTable.write(out);
     out.writeLong(lastUpdatedTime);
-    out.writeUTF(metaDataFilepath);
-    out.writeUTF(tablePath);
+    out.writeUTF(getOrCreateAbsoluteTableIdentifier().getTablePath());
     boolean isChildSchemaExists =
         null != dataMapSchemaList && dataMapSchemaList.size() > 0;
     out.writeBoolean(isChildSchemaExists);
@@ -286,7 +262,6 @@ public class TableInfo implements Serializable, Writable {
     this.factTable = new TableSchema();
     this.factTable.readFields(in);
     this.lastUpdatedTime = in.readLong();
-    this.metaDataFilepath = in.readUTF();
     this.tablePath = in.readUTF();
     boolean isChildSchemaExists = in.readBoolean();
     this.dataMapSchemaList = new ArrayList<>();
@@ -303,11 +278,11 @@ public class TableInfo implements Serializable, Writable {
         dataMapSchemaList.add(dataMapSchema);
       }
     }
-    boolean isParentTableRelationIndentifierExists = in.readBoolean();
-    if (isParentTableRelationIndentifierExists) {
-      short parentTableIndentifiersListSize = in.readShort();
+    boolean isParentTableRelationIdentifierExists = in.readBoolean();
+    if (isParentTableRelationIdentifierExists) {
+      short parentTableIdentifiersListSize = in.readShort();
       this.parentRelationIdentifiers = new ArrayList<>();
-      for (int i = 0; i < parentTableIndentifiersListSize; i++) {
+      for (int i = 0; i < parentTableIdentifiersListSize; i++) {
         RelationIdentifier relationIdentifier = new RelationIdentifier(null, null, null);
         relationIdentifier.readFields(in);
         this.parentRelationIdentifiers.add(relationIdentifier);
@@ -319,7 +294,7 @@ public class TableInfo implements Serializable, Writable {
     if (identifier == null) {
       CarbonTableIdentifier carbontableIdentifier =
           new CarbonTableIdentifier(databaseName, factTable.getTableName(), factTable.getTableId());
-      identifier = new AbsoluteTableIdentifier(tablePath, carbontableIdentifier);
+      identifier = AbsoluteTableIdentifier.from(tablePath, carbontableIdentifier);
     }
     return identifier;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
index de0711b..b5c56dc 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/QueryUtil.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.complextypes.ArrayQueryType;
@@ -408,19 +409,17 @@ public class QueryUtil {
 
   public static AbsoluteTableIdentifier getTableIdentifierForColumn(CarbonDimension carbonDimension,
       AbsoluteTableIdentifier absoluteTableIdentifier) {
-    String parentTableName =
-        carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
-            .getRelationIdentifier().getTableName();
-    String parentDatabaseName =
-        carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
-            .getRelationIdentifier().getDatabaseName();
-    String parentTableId = carbonDimension.getColumnSchema().getParentColumnTableRelations().get(0)
-        .getRelationIdentifier().getTableId();
-    CarbonTableIdentifier carbonTableIdentifier =
-        new CarbonTableIdentifier(parentDatabaseName, parentTableName, parentTableId);
+    RelationIdentifier relation = carbonDimension.getColumnSchema()
+        .getParentColumnTableRelations()
+        .get(0)
+        .getRelationIdentifier();
+    String parentTableName = relation.getTableName();
+    String parentDatabaseName = relation.getDatabaseName();
+    String parentTableId = relation.getTableId();
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
-    String newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, carbonTableIdentifier);
-    return new AbsoluteTableIdentifier(newTablePath, carbonTableIdentifier);
+    String newTablePath = CarbonUtil.getNewTablePath(carbonTablePath, parentTableName);
+    return AbsoluteTableIdentifier.from(newTablePath, parentDatabaseName, parentTableName,
+        parentTableId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
index 22b0884..fbaca58 100644
--- a/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
+++ b/core/src/main/java/org/apache/carbondata/core/service/ColumnUniqueIdService.java
@@ -24,9 +24,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
 public interface ColumnUniqueIdService {
 
   /**
-   * @param databaseName
    * @param columnSchema
    * @return generate unique id
    */
-  String generateUniqueId(String databaseName, ColumnSchema columnSchema);
+  String generateUniqueId(ColumnSchema columnSchema);
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
index b9821d7..2816e85 100644
--- a/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/service/impl/ColumnUniqueIdGenerator.java
@@ -28,7 +28,7 @@ public class ColumnUniqueIdGenerator implements ColumnUniqueIdService {
 
   private static ColumnUniqueIdService columnUniqueIdService = new ColumnUniqueIdGenerator();
 
-  @Override public String generateUniqueId(String databaseName, ColumnSchema columnSchema) {
+  @Override public String generateUniqueId(ColumnSchema columnSchema) {
     return UUID.randomUUID().toString();
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 1175cf0..1e8bc1f 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -60,7 +60,6 @@ import org.apache.carbondata.core.indexstore.BlockletDetailInfo;
 import org.apache.carbondata.core.keygenerator.mdkey.NumberCompressor;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.ValueEncoderMeta;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
@@ -2152,14 +2151,14 @@ public final class CarbonUtil {
    * tableName to the parent
    *
    * @param carbonTablePath       Old tablePath
-   * @param carbonTableIdentifier new carbonTableIdentifier
+   * @param newTableName          new table name
    * @return the new table path
    */
-  public static String getNewTablePath(Path carbonTablePath,
-      CarbonTableIdentifier carbonTableIdentifier) {
+  public static String getNewTablePath(
+      Path carbonTablePath,
+      String newTableName) {
     Path parentPath = carbonTablePath.getParent();
-    return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + carbonTableIdentifier
-        .getTableName();
+    return parentPath.toString() + CarbonCommonConstants.FILE_SEPARATOR + newTableName;
   }
 
   /*

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 376a71f..172b71d 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -39,16 +39,12 @@ public class CarbonTablePath extends Path {
   private static final String SORT_INDEX_EXT = ".sortindex";
   private static final String SCHEMA_FILE = "schema";
   private static final String TABLE_STATUS_FILE = "tablestatus";
-  private static final String TABLE_UPDATE_STATUS_FILE = "tableupdatestatus";
   private static final String FACT_DIR = "Fact";
   private static final String SEGMENT_PREFIX = "Segment_";
   private static final String PARTITION_PREFIX = "Part";
   private static final String CARBON_DATA_EXT = ".carbondata";
-  private static final String CARBON_DELTE_DELTA_EXT = ".deletedelta";
-  private static final String CARBON_UPDATE_DELTA_EXT = ".updatedelta";
   private static final String DATA_PART_PREFIX = "part-";
   private static final String BATCH_PREFIX = "_batchno";
-  private static final String DELETE_DELTA_FILE_EXT = ".deletedelta";
 
   public static final String INDEX_FILE_EXT = ".carbonindex";
   public static final String MERGE_INDEX_FILE_EXT = ".carbonindexmerge";
@@ -112,20 +108,6 @@ public class CarbonTablePath extends Path {
     }
     return false;
   }
-  /**
-   * check if it is carbon data file matching extension
-   *
-   * @param fileNameWithPath
-   * @return boolean
-   */
-  public static boolean isCarbonDataFileOrUpdateFile(String fileNameWithPath) {
-    int pos = fileNameWithPath.lastIndexOf('.');
-    if (pos != -1) {
-      return fileNameWithPath.substring(pos).startsWith(CARBON_DATA_EXT) || fileNameWithPath
-          .substring(pos).startsWith(CARBON_UPDATE_DELTA_EXT);
-    }
-    return false;
-  }
 
   /**
    * check if it is carbon index file matching extension
@@ -157,14 +139,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * @return it return relative directory
-   */
-  public String getRelativeDictionaryDirectory() {
-    return carbonTableIdentifier.getDatabaseName() + File.separator + carbonTableIdentifier
-        .getTableName();
-  }
-
-  /**
    * This method will return the metadata directory location for a table
    *
    * @return
@@ -174,6 +148,13 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * Return metadata path based on `tablePath`
+   */
+  public static String getMetadataPath(String tablePath) {
+    return tablePath + File.separator + METADATA_DIR;
+  }
+
+  /**
    * @param columnId unique column identifier
    * @return absolute path of dictionary meta file
    */
@@ -223,13 +204,6 @@ public class CarbonTablePath extends Path {
   }
 
   /**
-   * @return absolute path of table update status file
-   */
-  public String getTableUpdateStatusFilePath() {
-    return getMetaDataDir() + File.separator + TABLE_UPDATE_STATUS_FILE;
-  }
-
-  /**
    * Gets absolute path of data file
    *
    * @param partitionId         unique partition identifier
@@ -322,64 +296,6 @@ public class CarbonTablePath extends Path {
       String factUpdatedtimeStamp) {
     return taskNo + "-" + bucketNumber + "-" + factUpdatedtimeStamp + INDEX_FILE_EXT;
   }
-  /**
-   * Below method will be used to get the index file present in the segment folder
-   * based on task id
-   *
-   * @param taskId      task id of the file
-   * @param partitionId partition number
-   * @param segmentId   segment number
-   * @return full qualified carbon index path
-   */
-  public String getCarbonUpdatedIndexFilePath(final String taskId, final String partitionId,
-      final String segmentId) {
-    String segmentDir = getSegmentDir(partitionId, segmentId);
-    CarbonFile carbonFile =
-        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
-
-    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return file.getName().startsWith(taskId) && file.getName().endsWith(INDEX_FILE_EXT);
-      }
-    });
-    if (files.length > 0) {
-      return files[0].getAbsolutePath();
-    } else {
-      throw new RuntimeException(
-          "Missing Carbon Updated index file for partition[" + partitionId
-              + "] Segment[" + segmentId + "], taskId[" + taskId + "]");
-    }
-  }
-
-  /**
-   * Below method will be used to get the index file present in the segment folder
-   * based on task id
-   *
-   * @param taskId      task id of the file
-   * @param partitionId partition number
-   * @param segmentId   segment number
-   * @return full qualified carbon index path
-   */
-  public String getCarbonDeleteDeltaFilePath(final String taskId, final String partitionId,
-      final String segmentId) {
-    String segmentDir = getSegmentDir(partitionId, segmentId);
-    CarbonFile carbonFile =
-        FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir));
-
-    CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() {
-      @Override public boolean accept(CarbonFile file) {
-        return file.getName().startsWith(taskId) && file.getName().endsWith(DELETE_DELTA_FILE_EXT);
-      }
-    });
-    if (files.length > 0) {
-      return files[0].getAbsolutePath();
-    } else {
-      throw new RuntimeException(
-          "Missing Carbon delete delta file index file for partition["
-              + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId
-              + "]");
-    }
-  }
 
   /**
    * Gets absolute path of data file
@@ -427,18 +343,6 @@ public class CarbonTablePath extends Path {
     return segmentDir + File.separator + getCarbonStreamIndexFileName();
   }
 
-  /**
-   * Below method will be used to get the carbon index filename
-   *
-   * @param taskNo               task number
-   * @param factUpdatedTimeStamp time stamp
-   * @return filename
-   */
-  public String getCarbonIndexFileName(int taskNo, String factUpdatedTimeStamp,
-      String indexFileExtension) {
-    return taskNo + "-" + factUpdatedTimeStamp + indexFileExtension;
-  }
-
   public String getSegmentDir(String partitionId, String segmentId) {
     return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId;
   }
@@ -583,21 +487,6 @@ public class CarbonTablePath extends Path {
         return carbonDataFileName;
       }
     }
-
-    /**
-     * Gets the file name of the delta files.
-     *
-     * @param filePartNo
-     * @param taskNo
-     * @param factUpdateTimeStamp
-     * @param Extension
-     * @return
-     */
-    public static String getCarbonDeltaFileName(String filePartNo, String taskNo,
-        String factUpdateTimeStamp, String Extension) {
-      return DATA_PART_PREFIX + filePartNo + "-" + taskNo + "-" + factUpdateTimeStamp
-          + Extension;
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java
index cea9ad7..7b4a076 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryCacheLoaderImplTest.java
@@ -49,7 +49,7 @@ public class DictionaryCacheLoaderImplTest {
 
   @BeforeClass public static void setUp() {
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("db", "table1", "1");
-    AbsoluteTableIdentifier absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp",
+    AbsoluteTableIdentifier absoluteTableIdentifier = AbsoluteTableIdentifier.from("/tmp",
         carbonTableIdentifier);
     Map<String, String> columnProperties = new HashMap<>();
     columnProperties.put("prop1", "value1");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java
index 1fb5a18..d81a60f 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/DictionaryColumnUniqueIdentifierTest.java
@@ -22,7 +22,6 @@ import java.util.Map;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 
 import mockit.Mock;
@@ -45,9 +44,9 @@ public class DictionaryColumnUniqueIdentifierTest {
         new CarbonTableIdentifier("testDatabase", "testTable", "1");
     CarbonTableIdentifier carbonTableIdentifier2 =
         new CarbonTableIdentifier("testDatabase", "testTable", "2");
-    AbsoluteTableIdentifier absoluteTableIdentifier1 = new AbsoluteTableIdentifier("storepath",
+    AbsoluteTableIdentifier absoluteTableIdentifier1 = AbsoluteTableIdentifier.from("storepath",
         carbonTableIdentifier1);
-    AbsoluteTableIdentifier absoluteTableIdentifier2 = new AbsoluteTableIdentifier("storepath",
+    AbsoluteTableIdentifier absoluteTableIdentifier2 = AbsoluteTableIdentifier.from("storepath",
         carbonTableIdentifier2);
     Map<String, String> properties = new HashMap<>();
     ColumnIdentifier columnIdentifier = new ColumnIdentifier("2", properties, DataTypes.STRING);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
index d6041b0..c0b822b 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ForwardDictionaryCacheTest.java
@@ -28,14 +28,15 @@ import java.util.UUID;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -57,7 +58,7 @@ public class ForwardDictionaryCacheTest extends AbstractDictionaryCacheTest {
     carbonTableIdentifier =
         new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
     absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(carbonStorePath + "/" + databaseName + "/" + tableName,
+        AbsoluteTableIdentifier.from(carbonStorePath + "/" + databaseName + "/" + tableName,
             carbonTableIdentifier);
     columnIdentifiers = new String[] { "name", "place" };
     deleteStorePath();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
index 9c5b956..37ae9de 100644
--- a/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/cache/dictionary/ReverseDictionaryCacheTest.java
@@ -59,9 +59,8 @@ public class ReverseDictionaryCacheTest extends AbstractDictionaryCacheTest {
     this.carbonStorePath = props.getProperty("storePath", "carbonStore");
     carbonTableIdentifier =
         new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
-    absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(carbonStorePath + "/" + databaseName + "/" + tableName,
-            carbonTableIdentifier);
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+        carbonStorePath + "/" + databaseName + "/" + tableName, carbonTableIdentifier);
     columnIdentifiers = new String[] { "name", "place" };
     deleteStorePath();
     prepareDataSet();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
index 6d3ae0e..12d69f5 100644
--- a/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/carbon/AbsoluteTableIdentifierTest.java
@@ -33,14 +33,14 @@ public class AbsoluteTableIdentifierTest {
   static AbsoluteTableIdentifier absoluteTableIdentifier4;
 
   @BeforeClass public static void setup() {
-    absoluteTableIdentifier = new AbsoluteTableIdentifier("storePath/databaseName/tableName",
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from("storePath/databaseName/tableName",
         new CarbonTableIdentifier("databaseName", "tableName", "tableId"));
-    absoluteTableIdentifier1 = new AbsoluteTableIdentifier("dummy", null);
-    absoluteTableIdentifier2 = new AbsoluteTableIdentifier("dumgfhmy", null);
+    absoluteTableIdentifier1 = AbsoluteTableIdentifier.from("dummy", null);
+    absoluteTableIdentifier2 = AbsoluteTableIdentifier.from("dumgfhmy", null);
     absoluteTableIdentifier3 =
-        new AbsoluteTableIdentifier("duhgmmy/dumy/dmy/",
+        AbsoluteTableIdentifier.from("duhgmmy/dumy/dmy/",
             new CarbonTableIdentifier("dummy", "dumy", "dmy"));
-    absoluteTableIdentifier4 = new AbsoluteTableIdentifier("storePath/databaseName/tableName",
+    absoluteTableIdentifier4 = AbsoluteTableIdentifier.from("storePath/databaseName/tableName",
         new CarbonTableIdentifier("databaseName", "tableName", "tableId"));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
index cec70dd..19e91da 100644
--- a/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStoreTest.java
@@ -16,36 +16,27 @@
  */
 package org.apache.carbondata.core.datastore;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.datastore.block.AbstractIndex;
-import org.apache.carbondata.core.datastore.block.SegmentTaskIndex;
 import org.apache.carbondata.core.datastore.block.SegmentTaskIndexWrapper;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.BlockletInfo;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.blocklet.SegmentInfo;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.util.CarbonUtil;
 
-import mockit.Mock;
-import mockit.MockUp;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import static junit.framework.TestCase.assertEquals;
-import static junit.framework.TestCase.assertTrue;
 import static org.junit.Assert.assertNull;
 
 public class SegmentTaskIndexStoreTest {
@@ -63,7 +54,7 @@ public class SegmentTaskIndexStoreTest {
             createCache(CacheType.DRIVER_BTREE);
     tableBlockInfo = new TableBlockInfo("file", 0L, "SG100", locations, 10L,
         ColumnarFormatVersion.valueOf(version), null);
-    absoluteTableIdentifier = new AbsoluteTableIdentifier("/tmp",
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from("/tmp",
         new CarbonTableIdentifier("testdatabase", "testtable", "TB100"));
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
index 4e3bddb..7450382 100644
--- a/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/reader/CarbonDictionaryReaderImplTest.java
@@ -25,11 +25,11 @@ import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.service.impl.PathFactory;
-import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
+import org.apache.carbondata.core.service.CarbonCommonFactory;
 import org.apache.carbondata.core.service.PathService;
+import org.apache.carbondata.core.service.impl.PathFactory;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 
 import mockit.Mock;
@@ -49,7 +49,7 @@ public class CarbonDictionaryReaderImplTest {
   @BeforeClass public static void setUp() throws Exception {
     columnIdentifier = new ColumnIdentifier("1", null, null);
     absoluteTableIdentifier =
-        new AbsoluteTableIdentifier("storePath",
+        AbsoluteTableIdentifier.from("tablePath",
             new CarbonTableIdentifier("dbName", "tableName", UUID.randomUUID().toString()));
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java b/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
index 2953c33..b4c791c 100644
--- a/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/reader/sortindex/CarbonDictionarySortIndexReaderImplTest.java
@@ -23,17 +23,18 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter;
 import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriter;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
+
 import org.apache.commons.lang.ArrayUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -65,7 +66,7 @@ public class CarbonDictionarySortIndexReaderImplTest {
     CarbonTableIdentifier carbonTableIdentifier = new CarbonTableIdentifier("testSchema", "carbon",
     		UUID.randomUUID().toString());
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(storePath+"/testSchema/carbon", carbonTableIdentifier);
+        AbsoluteTableIdentifier.from(storePath+"/testSchema/carbon", carbonTableIdentifier);
     ColumnIdentifier columnIdentifier = new ColumnIdentifier("Name", null, null);
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
index f1f05f0..89b3122 100644
--- a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterUtilTest.java
@@ -21,20 +21,17 @@ import java.util.List;
 import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.IndexKey;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.metadata.datatype.DecimalType;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.keygenerator.mdkey.MultiDimKeyVarLengthGenerator;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
 import org.apache.carbondata.core.scan.expression.LiteralExpression;
@@ -368,8 +365,6 @@ public class FilterUtilTest extends AbstractDictionaryCacheTest {
 
   @Test public void testGetNoDictionaryValKeyMemberForFilter() throws FilterUnsupportedException {
     boolean isIncludeFilter = true;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(this.carbonStorePath, carbonTableIdentifier);
     ColumnExpression expression = new ColumnExpression("test", DataTypes.STRING);
     List<String> evaluateResultListFinal = new ArrayList<>();
     evaluateResultListFinal.add("test1");

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
index 5f7e971..936f87f 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java
@@ -41,7 +41,7 @@ public class CarbonFormatDirectoryStructureTest {
     CarbonTableIdentifier tableIdentifier = new CarbonTableIdentifier("d1", "t1", UUID.randomUUID().toString());
     CarbonStorePath carbonStorePath = new CarbonStorePath(CARBON_STORE);
     AbsoluteTableIdentifier absoluteTableIdentifier =
-        new AbsoluteTableIdentifier(CARBON_STORE + "/d1/t1", tableIdentifier);
+        AbsoluteTableIdentifier.from(CARBON_STORE + "/d1/t1", tableIdentifier);
     CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     assertTrue(carbonTablePath.getPath().replace("\\", "/").equals(CARBON_STORE + "/d1/t1"));
     assertTrue(carbonTablePath.getSchemaFilePath().replace("\\", "/").equals(CARBON_STORE + "/d1/t1/Metadata/schema"));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
index 86ba9c8..4fca00e 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/CarbonDictionaryWriterImplTest.java
@@ -33,23 +33,23 @@ import java.util.Properties;
 import java.util.UUID;
 
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.ColumnIdentifier;
 import org.apache.carbondata.core.reader.CarbonDictionaryColumnMetaChunk;
 import org.apache.carbondata.core.reader.CarbonDictionaryMetadataReaderImpl;
 import org.apache.carbondata.core.reader.CarbonDictionaryReaderImpl;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 
 import mockit.Mock;
 import mockit.MockUp;
-import org.apache.carbondata.format.ColumnDictionaryChunkMeta;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -103,7 +103,7 @@ public class CarbonDictionaryWriterImplTest {
     this.carbonStorePath = props.getProperty("storePath", "carbonStore");
     this.columnIdentifier = new ColumnIdentifier("Name", null, null);
     carbonTableIdentifier = new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
-    absoluteTableIdentifier = new AbsoluteTableIdentifier(carbonStorePath, carbonTableIdentifier);
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonStorePath, carbonTableIdentifier);
     this.dictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,
             columnIdentifier.getDataType(),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
index e64726a..2b5cc85 100644
--- a/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/writer/sortindex/CarbonDictionarySortIndexWriterImplTest.java
@@ -59,7 +59,7 @@ public class CarbonDictionarySortIndexWriterImplTest {
     String tablePath =
         storePath + "/" + carbonTableIdentifier.getDatabaseName() + "/" + carbonTableIdentifier
             .getTableName();
-    absoluteTableIdentifier = new AbsoluteTableIdentifier(tablePath, carbonTableIdentifier);
+    absoluteTableIdentifier = AbsoluteTableIdentifier.from(tablePath, carbonTableIdentifier);
     columnIdentifier = new ColumnIdentifier("Name", null, null);
     DictionaryColumnUniqueIdentifier dictionaryColumnUniqueIdentifier =
         new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, columnIdentifier,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
index 2e6abad..47afc9c 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/util/SchemaReader.java
@@ -59,11 +59,11 @@ public class SchemaReader {
       thriftReader.close();
 
       SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-      TableInfo wrapperTableInfo = schemaConverter
-          .fromExternalToWrapperTableInfo(tableInfo,
-              identifier.getCarbonTableIdentifier().getDatabaseName(), tableName,
-              identifier.getTablePath());
-      wrapperTableInfo.setMetaDataFilepath(CarbonTablePath.getFolderContainingFile(schemaFilePath));
+      TableInfo wrapperTableInfo = schemaConverter.fromExternalToWrapperTableInfo(
+          tableInfo,
+          identifier.getCarbonTableIdentifier().getDatabaseName(),
+          tableName,
+          identifier.getTablePath());
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);
       return CarbonMetadata.getInstance().getCarbonTable(
           identifier.getCarbonTableIdentifier().getTableUniqueName());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
index 097ff3e..d7f9ac2 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamInputFormatTest.java
@@ -51,14 +51,15 @@ public class CarbonStreamInputFormatTest extends TestCase {
   private TaskAttemptContext taskAttemptContext;
   private Configuration hadoopConf;
   private AbsoluteTableIdentifier identifier;
-  private String storePath;
+  private String tablePath;
 
 
   @Override protected void setUp() throws Exception {
-    storePath = new File("target/stream_input").getCanonicalPath();
+    tablePath = new File("target/stream_input").getCanonicalPath();
     String dbName = "default";
     String tableName = "stream_table_input";
-    identifier = new AbsoluteTableIdentifier(storePath,
+    identifier = AbsoluteTableIdentifier.from(
+        tablePath,
         new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
 
     JobID jobId = CarbonInputFormatUtil.getJobId(new Date(), 0);
@@ -91,8 +92,8 @@ public class CarbonStreamInputFormatTest extends TestCase {
 
   @Override protected void tearDown() throws Exception {
     super.tearDown();
-    if (storePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(storePath));
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
index 53fc071..e871c7e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/streaming/CarbonStreamOutputFormatTest.java
@@ -47,7 +47,7 @@ public class CarbonStreamOutputFormatTest extends TestCase {
   private Configuration hadoopConf;
   private TaskAttemptID taskAttemptId;
   private CarbonLoadModel carbonLoadModel;
-  private String storePath;
+  private String tablePath;
 
   @Override protected void setUp() throws Exception {
     super.setUp();
@@ -62,11 +62,13 @@ public class CarbonStreamOutputFormatTest extends TestCase {
     hadoopConf.setBoolean("mapred.task.is.map", true);
     hadoopConf.setInt("mapred.task.partition", 0);
 
-    storePath = new File("target/stream_output").getCanonicalPath();
+    tablePath = new File("target/stream_output").getCanonicalPath();
     String dbName = "default";
     String tableName = "stream_table_output";
-    AbsoluteTableIdentifier identifier = new AbsoluteTableIdentifier(storePath,
-        new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+    AbsoluteTableIdentifier identifier =
+        AbsoluteTableIdentifier.from(
+            tablePath,
+            new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
 
     CarbonTable table = StoreCreator.createTable(identifier);
 
@@ -112,8 +114,8 @@ public class CarbonStreamOutputFormatTest extends TestCase {
 
   @Override protected void tearDown() throws Exception {
     super.tearDown();
-    if (storePath != null) {
-      FileFactory.deleteAllFilesOfDir(new File(storePath));
+    if (tablePath != null) {
+      FileFactory.deleteAllFilesOfDir(new File(tablePath));
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
index ab8790d..bea1d5e 100644
--- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
+++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java
@@ -107,7 +107,9 @@ public class StoreCreator {
       String dbName = "testdb";
       String tableName = "testtable";
       absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(storePath +"/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
+          AbsoluteTableIdentifier.from(
+              storePath +"/testdb/testtable",
+              new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
     } catch (IOException ex) {
 
     }
@@ -276,16 +278,16 @@ public class StoreCreator {
     );
     tableInfo.setLastUpdatedTime(System.currentTimeMillis());
     tableInfo.setFactTable(tableSchema);
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier);
+    CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier);
     String schemaFilePath = carbonTablePath.getSchemaFilePath();
     String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath);
-    tableInfo.setMetaDataFilepath(schemaMetadataPath);
     CarbonMetadata.getInstance().loadTableMetadata(tableInfo);
 
     SchemaConverter schemaConverter = new ThriftWrapperSchemaConverterImpl();
-    org.apache.carbondata.format.TableInfo thriftTableInfo = schemaConverter
-        .fromWrapperToExternalTableInfo(tableInfo, tableInfo.getDatabaseName(),
+    org.apache.carbondata.format.TableInfo thriftTableInfo =
+        schemaConverter.fromWrapperToExternalTableInfo(
+            tableInfo,
+            tableInfo.getDatabaseName(),
             tableInfo.getFactTable().getTableName());
     org.apache.carbondata.format.SchemaEvolutionEntry schemaEvolutionEntry =
         new org.apache.carbondata.format.SchemaEvolutionEntry(tableInfo.getLastUpdatedTime());

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
----------------------------------------------------------------------
diff --git a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
index ce159d6..7fe55fb 100755
--- a/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
+++ b/integration/presto/src/main/java/org/apache/carbondata/presto/impl/CarbonTableReader.java
@@ -307,7 +307,7 @@ public class CarbonTableReader {
       // get the store path of the table.
 
       AbsoluteTableIdentifier absoluteTableIdentifier =
-          new AbsoluteTableIdentifier(tablePath, cache.carbonTableIdentifier);
+          AbsoluteTableIdentifier.from(tablePath, cache.carbonTableIdentifier);
       cache.carbonTablePath =
           PathFactory.getInstance().getCarbonTablePath(absoluteTableIdentifier, null);
       // cache the table
@@ -336,8 +336,6 @@ public class CarbonTableReader {
       TableInfo wrapperTableInfo = schemaConverter
           .fromExternalToWrapperTableInfo(tableInfo, table.getSchemaName(), table.getTableName(),
               tablePath);
-      wrapperTableInfo.setMetaDataFilepath(
-              CarbonTablePath.getFolderContainingFile(cache.carbonTablePath.getSchemaFilePath()));
 
       // Step 4: Load metadata info into CarbonMetadata
       CarbonMetadata.getInstance().loadTableMetadata(wrapperTableInfo);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 87f5fa0..eda19c4 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -70,7 +70,7 @@ object CarbonDataStoreCreator {
       logger.info("Creating The Carbon Store")
       val dbName: String = "testdb"
       val tableName: String = "testtable"
-      val absoluteTableIdentifier = new AbsoluteTableIdentifier(
+      val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
         storePath + "/"+ dbName + "/" + tableName,
         new CarbonTableIdentifier(dbName,
           tableName,
@@ -297,7 +297,6 @@ object CarbonDataStoreCreator {
     val schemaFilePath: String = carbonTablePath.getSchemaFilePath
     val schemaMetadataPath: String =
       CarbonTablePath.getFolderContainingFile(schemaFilePath)
-    tableInfo.setMetaDataFilepath(schemaMetadataPath)
     CarbonMetadata.getInstance.loadTableMetadata(tableInfo)
     val schemaConverter: SchemaConverter =
       new ThriftWrapperSchemaConverterImpl()

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index e53df6c..ea95945 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -20,6 +20,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
@@ -73,8 +74,8 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
-          AbsoluteTableIdentifier(
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+          AbsoluteTableIdentifier.from(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier("default", "cardinalityTest", "1")
           )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 508ca6c..20a779d 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -21,6 +21,7 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 import scala.collection.JavaConverters._
 
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -35,8 +36,8 @@ import org.apache.spark.sql.test.util.QueryTest
   */
 class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
 
-  val absoluteTableIdentifier: AbsoluteTableIdentifier = new
-      AbsoluteTableIdentifier(
+  val absoluteTableIdentifier: AbsoluteTableIdentifier =
+      AbsoluteTableIdentifier.from(
         CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
         new CarbonTableIdentifier(
           CarbonCommonConstants.DATABASE_DEFAULT_NAME, "compactionlocktesttable", "1")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
index 3399740..57c5204 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala
@@ -17,11 +17,12 @@
 
 package org.apache.carbondata.spark.testsuite.dataload
 
+import java.io.File
 import java.math.BigDecimal
 
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, Row, SaveMode}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll {
@@ -222,6 +223,26 @@ test("test the boolean data type"){
     )
   }
 
+  test("test datasource table with specified table path") {
+    val path = "./source"
+    df2.write
+      .format("carbondata")
+      .option("tableName", "carbon10")
+      .option("tablePath", path)
+      .mode(SaveMode.Overwrite)
+      .save()
+    assert(new File(path).exists())
+    checkAnswer(
+      sql("select count(*) from carbon10 where c3 > 500"), Row(500)
+    )
+    sql("drop table carbon10")
+    assert(! new File(path).exists())
+    assert(intercept[AnalysisException](
+      sql("select count(*) from carbon10 where c3 > 500"))
+      .message
+      .contains("not found"))
+  }
+
   override def afterAll {
     dropTable
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
index c29e517..1d5b33b 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithHiveSyntaxDefaultFormat.scala
@@ -19,8 +19,9 @@ package org.apache.carbondata.spark.testsuite.dataload
 
 import java.io.File
 
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{AnalysisException, Row}
 import org.scalatest.BeforeAndAfterAll
+
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
@@ -687,6 +688,47 @@ class TestLoadDataWithHiveSyntaxDefaultFormat extends QueryTest with BeforeAndAf
     checkAnswer(sql("select salary from double_test limit 1"),Row(7.756787654567891E23))
   }
 
+  test("test table with specified table path") {
+    val path = "./source"
+    sql("drop table if exists table_path_test")
+    sql(
+      "CREATE table table_path_test (empno string, salary double) STORED BY 'carbondata' " +
+      s"LOCATION '$path'"
+    )
+    sql(
+      s"load data local inpath '$resourcesPath/double.csv' into table table_path_test options" +
+      "('FILEHEADER'='empno,salary')")
+    assert(new File(path).exists())
+    checkAnswer(sql("select salary from table_path_test limit 1"),Row(7.756787654567891E23))
+    sql("drop table table_path_test")
+    assert(! new File(path).exists())
+    assert(intercept[AnalysisException](
+      sql("select salary from table_path_test limit 1"))
+      .message
+      .contains("not found"))
+  }
+
+  test("test table with specified database and table path") {
+    val path = "./source"
+    sql("drop database if exists test cascade")
+    sql("create database if not exists test")
+    sql("CREATE table test.table_path_test (empno string, salary double) " +
+        "STORED BY 'carbondata'" +
+        s"LOCATION '$path'")
+    sql(
+      s"load data local inpath '$resourcesPath/double.csv' into table test.table_path_test options" +
+      "('FILEHEADER'='empno,salary')")
+    assert(new File(path).exists())
+    checkAnswer(sql("select salary from test.table_path_test limit 1"),Row(7.756787654567891E23))
+    sql("drop table test.table_path_test")
+    assert(! new File(path).exists())
+    assert(intercept[AnalysisException](
+      sql("select salary from test.table_path_test limit 1"))
+      .message
+      .contains("not found"))
+    sql("drop database if exists test cascade")
+  }
+
   override def afterAll {
     sql("drop table if exists escapechar1")
     sql("drop table if exists escapechar2")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index f7e93af..efbe807 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -105,7 +105,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("Records more than one pagesize after delete operation ") {
-    sql("DROP TABLE IF EXISTS default.carbon2")
+    sql("DROP TABLE IF EXISTS carbon2")
     import sqlContext.implicits._
     val df = sqlContext.sparkContext.parallelize(1 to 2000000)
       .map(x => (x+"a", "b", x))
@@ -118,15 +118,15 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
 
-    checkAnswer(sql("select count(*) from default.carbon2"), Seq(Row(2000000)))
+    checkAnswer(sql("select count(*) from carbon2"), Seq(Row(2000000)))
 
-    sql("delete from default.carbon2 where c1 = '99999a'").show()
+    sql("delete from carbon2 where c1 = '99999a'").show()
 
-    checkAnswer(sql("select count(*) from default.carbon2"), Seq(Row(1999999)))
+    checkAnswer(sql("select count(*) from carbon2"), Seq(Row(1999999)))
 
-    checkAnswer(sql("select * from default.carbon2 where c1 = '99999a'"), Seq())
+    checkAnswer(sql("select * from carbon2 where c1 = '99999a'"), Seq())
 
-    sql("DROP TABLE IF EXISTS default.carbon2")
+    sql("DROP TABLE IF EXISTS carbon2")
   }
 
   test("test if delete is unsupported for pre-aggregate tables") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 005dc01..d381a43 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -424,7 +424,7 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("More records after update operation ") {
-    sql("DROP TABLE IF EXISTS default.carbon1")
+    sql("DROP TABLE IF EXISTS carbon1")
     import sqlContext.implicits._
     val df = sqlContext.sparkContext.parallelize(1 to 36000)
       .map(x => (x+"a", "b", x))
@@ -437,15 +437,15 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Overwrite)
       .save()
 
-    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+    checkAnswer(sql("select count(*) from carbon1"), Seq(Row(36000)))
 
-    sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show()
+    sql("update carbon1 set (c1)=('test123') where c1='9999a'").show()
 
-    checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000)))
+    checkAnswer(sql("select count(*) from carbon1"), Seq(Row(36000)))
 
-    checkAnswer(sql("select * from default.carbon1 where c1 = 'test123'"), Row("test123","b",9999))
+    checkAnswer(sql("select * from carbon1 where c1 = 'test123'"), Row("test123","b",9999))
 
-    sql("DROP TABLE IF EXISTS default.carbon1")
+    sql("DROP TABLE IF EXISTS carbon1")
   }
 
   test("""CARBONDATA-1445 carbon.update.persist.enable=false it will fail to update data""") {
@@ -455,8 +455,8 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     val df = sqlContext.sparkContext.parallelize(0 to 50)
       .map(x => ("a", x.toString, (x % 2).toString, x, x.toLong, x * 2))
       .toDF("stringField1", "stringField2", "stringField3", "intField", "longField", "int2Field")
-    sql("DROP TABLE IF EXISTS default.study_carbondata ")
-    sql(s""" CREATE TABLE IF NOT EXISTS default.study_carbondata (
+    sql("DROP TABLE IF EXISTS study_carbondata ")
+    sql(s""" CREATE TABLE IF NOT EXISTS study_carbondata (
            |    stringField1          string,
            |    stringField2          string,
            |    stringField3          string,
@@ -473,14 +473,14 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
       .mode(SaveMode.Append)
       .save()
     sql("""
-      UPDATE default.study_carbondata a
+      UPDATE study_carbondata a
           SET (a.stringField1, a.stringField2) = (concat(a.stringField1 , "_test" ), concat(a.stringField2 , "_test" ))
       WHERE a.stringField2 = '1'
       """).show()
-    assert(sql("select stringField1 from default.study_carbondata where stringField2 = '1_test'").collect().length == 1)
+    assert(sql("select stringField1 from study_carbondata where stringField2 = '1_test'").collect().length == 1)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.isPersistEnabled, "true")
-    sql("DROP TABLE IF EXISTS default.study_carbondata ")
+    sql("DROP TABLE IF EXISTS study_carbondata ")
   }
 
   test("update table in carbondata with rand() ") {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 4a0d834..f4f569b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -43,8 +43,6 @@ object CarbonStore {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
 
   def showSegments(
-      dbName: String,
-      tableName: String,
       limit: Option[String],
       tableFolderPath: String): Seq[Row] = {
     val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(tableFolderPath)
@@ -149,6 +147,7 @@ object CarbonStore {
     }
   }
 
+  // TODO: move dbName and tableName to caller, caller should handle the log and error
   def deleteLoadById(
       loadids: Seq[String],
       dbName: String,
@@ -175,6 +174,7 @@ object CarbonStore {
     Seq.empty
   }
 
+  // TODO: move dbName and tableName to caller, caller should handle the log and error
   def deleteLoadByDate(
       timestamp: String,
       dbName: String,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
index 3b03e4d..44db43a 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/CreateTableEvents.scala
@@ -19,25 +19,25 @@ package org.apache.carbondata.events
 
 import org.apache.spark.sql._
 
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 
 /**
  * Class for handling operations before start of a load process.
  * Example usage: For validation purpose
  */
-case class CreateTablePreExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier,
-    storePath: String) extends Event with TableEventInfo
+case class CreateTablePreExecutionEvent(
+    sparkSession: SparkSession,
+    identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo
 
 /**
  * Class for handling operations after data load completion and before final
  * commit of load operation. Example usage: For loading pre-aggregate tables
  */
 case class CreateTablePostExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo
+    identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo
 
 /**
  * Class for handling clean up in case of any failure and abort the operation.
  */
 case class CreateTableAbortExecutionEvent(sparkSession: SparkSession,
-    carbonTableIdentifier: CarbonTableIdentifier) extends Event with TableEventInfo
+    identifier: AbsoluteTableIdentifier) extends Event with TableEventInfo


[2/3] carbondata git commit: [CARBONDATA-1844] Add tablePath support when creating table

Posted by ra...@apache.org.
http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
index ab77fba..b305fa9 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/DropTableEvents.scala
@@ -27,7 +27,8 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePreEvent(carbonTable: Option[CarbonTable],
+case class DropTablePreEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -39,7 +40,8 @@ case class DropTablePreEvent(carbonTable: Option[CarbonTable],
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTablePostEvent(carbonTable: Option[CarbonTable],
+case class DropTablePostEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo
@@ -51,7 +53,8 @@ case class DropTablePostEvent(carbonTable: Option[CarbonTable],
  * @param ifExistsSet
  * @param sparkSession
  */
-case class DropTableAbortEvent(carbonTable: Option[CarbonTable],
+case class DropTableAbortEvent(
+    carbonTable: CarbonTable,
     ifExistsSet: Boolean,
     sparkSession: SparkSession)
   extends Event with DropTableEventInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
index 9c542b6..6279fca 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/events/Events.scala
@@ -20,7 +20,7 @@ package org.apache.carbondata.events
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.execution.command.{AlterTableAddColumnsModel, AlterTableDataTypeChangeModel, AlterTableDropColumnModel, AlterTableRenameModel}
 
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
@@ -35,7 +35,7 @@ trait DatabaseEventInfo {
  * event for table related operations
  */
 trait TableEventInfo {
-  val carbonTableIdentifier: CarbonTableIdentifier
+  val identifier: AbsoluteTableIdentifier
 }
 
 /**
@@ -57,7 +57,7 @@ trait LookupRelationEventInfo {
  * event for drop table
  */
 trait DropTableEventInfo {
-  val carbonTable: Option[CarbonTable]
+  val carbonTable: CarbonTable
   val ifExistsSet: Boolean
 }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
index 4500221..594ea0e 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala
@@ -17,21 +17,17 @@
 
 package org.apache.carbondata.spark
 
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 
 /**
  * Contains all options for Spark data source
  */
 class CarbonOption(options: Map[String, String]) {
-  def tableIdentifier: String = options.getOrElse("tableName", s"$dbName.$tableName")
 
-  def dbName: String = options.getOrElse("dbName", CarbonCommonConstants.DATABASE_DEFAULT_NAME)
+  def dbName: Option[String] = options.get("dbName")
 
   def tableName: String = options.getOrElse("tableName", "default_table")
 
-  def tablePath: String = s"$dbName/$tableName"
-
-  def tableId: String = options.getOrElse("tableId", "default_table_id")
+  def tablePath: Option[String] = options.get("tablePath")
 
   def partitionCount: String = options.getOrElse("partitionCount", "1")
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 3f47a47..44fc7ad 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -505,7 +505,6 @@ class CarbonColumnDictPatition(id: Int, dimension: CarbonDimension)
  * @param sparkContext    spark context
  * @param table           carbon table identifier
  * @param dimensions      carbon dimenisons having predefined dict
- * @param hdfsLocation    carbon base store path
  * @param dictFolderPath  path of dictionary folder
  */
 class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
@@ -513,7 +512,6 @@ class CarbonColumnDictGenerateRDD(carbonLoadModel: CarbonLoadModel,
     sparkContext: SparkContext,
     table: CarbonTableIdentifier,
     dimensions: Array[CarbonDimension],
-    hdfsLocation: String,
     dictFolderPath: String)
   extends CarbonRDD[(Int, ColumnDistinctValues)](sparkContext, Nil) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index 50005c8..2f190b5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -53,8 +53,8 @@ class CarbonIUDMergerRDD[K, V](
 
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val jobConf: JobConf = new JobConf(new Configuration)
     val job: Job = new Job(jobConf)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 997838c..82b2a57 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -67,7 +67,7 @@ class CarbonMergerRDD[K, V](
   private val queryId = sparkContext.getConf.get("queryId", System.nanoTime() + "")
   var storeLocation: String = null
   var mergeResult: String = null
-  val hdfsStoreLocation = carbonMergerMapping.hdfsStoreLocation
+  val tablePath = carbonMergerMapping.hdfsStoreLocation
   val metadataFilePath = carbonMergerMapping.metadataFilePath
   val mergedLoadName = carbonMergerMapping.mergedLoadName
   val databaseName = carbonMergerMapping.databaseName
@@ -167,7 +167,7 @@ class CarbonMergerRDD[K, V](
         val dataFileMetadataSegMapping: java.util.Map[String, List[DataFileFooter]] =
           CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
 
-        carbonLoadModel.setTablePath(hdfsStoreLocation)
+        carbonLoadModel.setTablePath(tablePath)
         // check for restructured block
         // TODO: only in case of add and drop this variable should be true
         val restructuredBlockExists: Boolean = CarbonCompactionUtil
@@ -260,8 +260,8 @@ class CarbonMergerRDD[K, V](
 
   override def getPartitions: Array[Partition] = {
     val startTime = System.currentTimeMillis()
-    val absoluteTableIdentifier: AbsoluteTableIdentifier = new AbsoluteTableIdentifier(
-      hdfsStoreLocation, new CarbonTableIdentifier(databaseName, factTableName, tableId)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+      tablePath, new CarbonTableIdentifier(databaseName, factTableName, tableId)
     )
     val updateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
       absoluteTableIdentifier)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 11e5baf..7316574 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -330,9 +330,7 @@ class CarbonScanRDD(
   private def prepareInputFormatForDriver(conf: Configuration): CarbonTableInputFormat[Object] = {
     CarbonTableInputFormat.setTableInfo(conf, tableInfo)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo.getDatabaseName)
-    CarbonTableInputFormat
-      .setTableName(conf,
-        tableInfo.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+    CarbonTableInputFormat.setTableName(conf, tableInfo.getFactTable.getTableName)
     createInputFormat(conf)
   }
 
@@ -341,9 +339,7 @@ class CarbonScanRDD(
     val tableInfo1 = getTableInfo
     CarbonTableInputFormat.setTableInfo(conf, tableInfo1)
     CarbonTableInputFormat.setDatabaseName(conf, tableInfo1.getDatabaseName)
-    CarbonTableInputFormat
-      .setTableName(conf,
-        tableInfo1.getOrCreateAbsoluteTableIdentifier().getCarbonTableIdentifier.getTableName)
+    CarbonTableInputFormat.setTableName(conf, tableInfo1.getFactTable.getTableName)
     CarbonTableInputFormat.setDataTypeConverter(conf, new SparkDataTypeConverterImpl)
     createInputFormat(conf)
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
index e304d84..08d635b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/tasks/DictionaryWriterTask.scala
@@ -22,7 +22,7 @@ import scala.collection.mutable
 
 import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier, ColumnIdentifier}
+import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnIdentifier}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.util.DataTypeUtil

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index af1d3f8..3595884 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -41,7 +41,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.filesystem.CarbonFile
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.memory.{UnsafeMemoryManager, UnsafeSortMemoryManager}
-import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier}
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
@@ -529,7 +529,7 @@ object CommonUtil {
     CarbonLoaderUtil.populateNewLoadMetaEntry(
       newLoadMetaEntry, status, model.getFactTimeStamp, false)
     val entryAdded: Boolean =
-      CarbonLoaderUtil.recordLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
+      CarbonLoaderUtil.recordNewLoadMetadata(newLoadMetaEntry, model, true, insertOverwrite)
     if (!entryAdded) {
       sys.error(s"Failed to add entry in table status for " +
                 s"${ model.getDatabaseName }.${model.getTableName}")
@@ -550,7 +550,7 @@ object CommonUtil {
     val loadMetaEntry = model.getLoadMetadataDetails.get(model.getLoadMetadataDetails.size - 1)
     CarbonLoaderUtil
       .populateNewLoadMetaEntry(loadMetaEntry, loadStatus, model.getFactTimeStamp, true)
-    val updationStatus = CarbonLoaderUtil.recordLoadMetadata(loadMetaEntry, model, false, false)
+    val updationStatus = CarbonLoaderUtil.recordNewLoadMetadata(loadMetaEntry, model, false, false)
     if (!updationStatus) {
       sys
         .error(s"Failed to update failure entry in table status for ${

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index f6170e8..ccbc9f5 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -292,14 +292,13 @@ object GlobalDictionaryUtil {
    * @param carbonLoadModel carbon load model
    * @param table           CarbonTableIdentifier
    * @param dimensions      column list
-   * @param hdfsLocation    store location in HDFS
-   * @param dictfolderPath  path of dictionary folder
+   * @param dictFolderPath  path of dictionary folder
    */
-  def createDictionaryLoadModel(carbonLoadModel: CarbonLoadModel,
+  def createDictionaryLoadModel(
+      carbonLoadModel: CarbonLoadModel,
       table: CarbonTableIdentifier,
       dimensions: Array[CarbonDimension],
-      hdfsLocation: String,
-      dictfolderPath: String,
+      dictFolderPath: String,
       forPreDefDict: Boolean): DictionaryLoadModel = {
     val primDimensionsBuffer = new ArrayBuffer[CarbonDimension]
     val isComplexes = new ArrayBuffer[Boolean]
@@ -312,7 +311,7 @@ object GlobalDictionaryUtil {
     }
     val primDimensions = primDimensionsBuffer.map { x => x }.toArray
     val dictDetail = CarbonSparkFactory.getDictionaryDetailService.
-      getDictionaryDetail(dictfolderPath, primDimensions, table, hdfsLocation)
+      getDictionaryDetail(dictFolderPath, primDimensions, table, carbonLoadModel.getTablePath)
     val dictFilePaths = dictDetail.dictFilePaths
     val dictFileExists = dictDetail.dictFileExists
     val columnIdentifier = dictDetail.columnIdentifiers
@@ -327,11 +326,12 @@ object GlobalDictionaryUtil {
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
-    val absoluteTableIdentifier = new AbsoluteTableIdentifier(hdfsLocation, table)
-    DictionaryLoadModel(absoluteTableIdentifier,
+    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath, table)
+    DictionaryLoadModel(
+      absoluteTableIdentifier,
       dimensions,
-      hdfsLocation,
-      dictfolderPath,
+      carbonLoadModel.getTablePath,
+      dictFolderPath,
       dictFilePaths,
       dictFileExists,
       isComplexes.toArray,
@@ -505,7 +505,6 @@ object GlobalDictionaryUtil {
    * @param dimensions      dimension column
    * @param carbonLoadModel carbon load model
    * @param sqlContext      spark sql context
-   * @param hdfsLocation    store location on hdfs
    * @param dictFolderPath  generated global dict file path
    */
   def generatePredefinedColDictionary(colDictFilePath: String,
@@ -513,15 +512,14 @@ object GlobalDictionaryUtil {
       dimensions: Array[CarbonDimension],
       carbonLoadModel: CarbonLoadModel,
       sqlContext: SQLContext,
-      hdfsLocation: String,
       dictFolderPath: String): Unit = {
     // set pre defined dictionary column
     setPredefinedColumnDictPath(carbonLoadModel, colDictFilePath, table, dimensions)
     val dictLoadModel = createDictionaryLoadModel(carbonLoadModel, table, dimensions,
-      hdfsLocation, dictFolderPath, forPreDefDict = true)
+      dictFolderPath, forPreDefDict = true)
     // new RDD to achieve distributed column dict generation
     val extInputRDD = new CarbonColumnDictGenerateRDD(carbonLoadModel, dictLoadModel,
-      sqlContext.sparkContext, table, dimensions, hdfsLocation, dictFolderPath)
+      sqlContext.sparkContext, table, dimensions, dictFolderPath)
       .partitionBy(new ColumnPartitioner(dictLoadModel.primDimensions.length))
     val statusList = new CarbonGlobalDictionaryGenerateRDD(extInputRDD, dictLoadModel).collect()
     // check result status
@@ -681,15 +679,16 @@ object GlobalDictionaryUtil {
    * @param sqlContext      sql context
    * @param carbonLoadModel carbon load model
    */
-  def generateGlobalDictionary(sqlContext: SQLContext,
+  def generateGlobalDictionary(
+      sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      tablePath: String,
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None): Unit = {
     try {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
       val carbonTableIdentifier = carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier
       // create dictionary folder if not exists
+      val tablePath = carbonLoadModel.getTablePath
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier)
       val dictfolderPath = carbonTablePath.getMetadataDirectoryPath
       // columns which need to generate global dictionary file
@@ -709,7 +708,7 @@ object GlobalDictionaryUtil {
         if (colDictFilePath != null) {
           // generate predefined dictionary
           generatePredefinedColDictionary(colDictFilePath, carbonTableIdentifier,
-            dimensions, carbonLoadModel, sqlContext, tablePath, dictfolderPath)
+            dimensions, carbonLoadModel, sqlContext, dictfolderPath)
         }
         if (headers.length > df.columns.length) {
           val msg = "The number of columns in the file header do not match the " +
@@ -725,7 +724,7 @@ object GlobalDictionaryUtil {
           // select column to push down pruning
           df = df.select(requireColumnNames.head, requireColumnNames.tail: _*)
           val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-            requireDimension, tablePath, dictfolderPath, false)
+            requireDimension, dictfolderPath, false)
           // combine distinct value in a block and partition by column
           val inputRDD = new CarbonBlockDistinctValuesCombineRDD(df.rdd, model)
             .partitionBy(new ColumnPartitioner(model.primDimensions.length))
@@ -737,9 +736,9 @@ object GlobalDictionaryUtil {
           LOGGER.info("No column found for generating global dictionary in source data files")
         }
       } else {
-        generateDictionaryFromDictionaryFiles(sqlContext,
+        generateDictionaryFromDictionaryFiles(
+          sqlContext,
           carbonLoadModel,
-          tablePath,
           carbonTableIdentifier,
           dictfolderPath,
           dimensions,
@@ -764,11 +763,11 @@ object GlobalDictionaryUtil {
     }
   }
 
-  def generateDictionaryFromDictionaryFiles(sqlContext: SQLContext,
+  def generateDictionaryFromDictionaryFiles(
+      sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
-      storePath: String,
       carbonTableIdentifier: CarbonTableIdentifier,
-      dictfolderPath: String,
+      dictFolderPath: String,
       dimensions: Array[CarbonDimension],
       allDictionaryPath: String): Unit = {
     LOGGER.info("Generate global dictionary from dictionary files!")
@@ -781,7 +780,7 @@ object GlobalDictionaryUtil {
       val (requireDimension, requireColumnNames) = pruneDimensions(dimensions, headers, headers)
       if (requireDimension.nonEmpty) {
         val model = createDictionaryLoadModel(carbonLoadModel, carbonTableIdentifier,
-          requireDimension, storePath, dictfolderPath, false)
+          requireDimension, dictFolderPath, false)
         // check if dictionary files contains bad record
         val accumulator = sqlContext.sparkContext.accumulator(0)
         // read local dictionary file, and group by key

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 8048b66..4ad939c 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -245,7 +245,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
       tableName: String,
       fields: Seq[Field],
       partitionCols: Seq[PartitionerField],
-      tableProperties: mutable.Map[String, String],
+      tableProperties: Map[String, String],
       bucketFields: Option[BucketFields],
       isAlterFlow: Boolean = false,
       tableComment: Option[String] = None): TableModel = {
@@ -276,10 +276,9 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
 
     TableModel(
       ifNotExistPresent,
-      dbName.getOrElse(CarbonCommonConstants.DATABASE_DEFAULT_NAME),
       dbName,
       tableName,
-      tableProperties,
+      tableProperties.toMap,
       reorderDimensions(dims.map(f => normalizeType(f)).map(f => addParent(f))),
       msrs.map(f => normalizeType(f)),
       Option(sortKeyDims),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
index 844f6f7..44f577d 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.UUID
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.Map
 
 import org.apache.spark.SparkContext
 import org.apache.spark.sql.SQLContext
@@ -30,7 +29,7 @@ import org.apache.spark.sql.util.CarbonException
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType}
 import org.apache.carbondata.core.metadata.encoder.Encoding
 import org.apache.carbondata.core.metadata.schema._
@@ -48,7 +47,6 @@ import org.apache.carbondata.spark.util.DataTypeConverterUtil
 
 case class TableModel(
     ifNotExistsSet: Boolean,
-    var databaseName: String,
     databaseNameOp: Option[String],
     tableName: String,
     tableProperties: Map[String, String],
@@ -58,8 +56,7 @@ case class TableModel(
     highcardinalitydims: Option[Seq[String]],
     noInvertedIdxCols: Option[Seq[String]],
     columnGroups: Seq[String],
-    colProps: Option[util.Map[String,
-    util.List[ColumnProperty]]] = None,
+    colProps: Option[util.Map[String, util.List[ColumnProperty]]] = None,
     bucketFields: Option[BucketFields],
     partitionInfo: Option[PartitionInfo],
     tableComment: Option[String] = None,
@@ -322,8 +319,14 @@ class AlterTableColumnSchemaGenerator(
 
 // TODO: move this to carbon store API
 object TableNewProcessor {
-  def apply(cm: TableModel): TableInfo = {
-    new TableNewProcessor(cm).process
+  def apply(
+      cm: TableModel,
+      identifier: AbsoluteTableIdentifier): TableInfo = {
+    new TableNewProcessor(
+      cm,
+      identifier.getDatabaseName,
+      identifier.getTableName,
+      identifier.getTablePath).process
   }
 
   def createColumnSchema(
@@ -356,7 +359,7 @@ object TableNewProcessor {
     }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(databaseName, columnSchema)
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
     columnSchema.setColumnar(true)
@@ -370,7 +373,7 @@ object TableNewProcessor {
   }
 }
 
-class TableNewProcessor(cm: TableModel) {
+class TableNewProcessor(cm: TableModel, dbName: String, tableName: String, tablePath: String) {
 
   def getAllChildren(fieldChildren: Option[List[Field]]): Seq[ColumnSchema] = {
     var allColumns: Seq[ColumnSchema] = Seq[ColumnSchema]()
@@ -420,8 +423,7 @@ class TableNewProcessor(cm: TableModel) {
     }
     columnSchema.setEncodingList(encoders)
     val colUniqueIdGenerator = CarbonCommonFactory.getColumnUniqueIdGenerator
-    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(cm.databaseName,
-      columnSchema)
+    val columnUniqueId = colUniqueIdGenerator.generateUniqueId(columnSchema)
     columnSchema.setColumnUniqueId(columnUniqueId)
     columnSchema.setColumnReferenceId(columnUniqueId)
     columnSchema.setDimensionColumn(isDimensionCol)
@@ -529,7 +531,6 @@ class TableNewProcessor(cm: TableModel) {
         LOGGER.error(s"Duplicate column found with name: $name")
         LOGGER.audit(
           s"Validation failed for Create/Alter Table Operation " +
-          s"for ${ cm.databaseName }.${ cm.tableName }" +
           s"Duplicate column found with name: $name")
         CarbonException.analysisException(s"Duplicate dimensions found with name: $name")
       }
@@ -621,11 +622,12 @@ class TableNewProcessor(cm: TableModel) {
       partitionInfo.setColumnSchemaList(partitionCols)
       tableSchema.setPartitionInfo(partitionInfo)
     }
-    tableSchema.setTableName(cm.tableName)
+    tableSchema.setTableName(tableName)
     tableSchema.setListOfColumns(allColumns.asJava)
     tableSchema.setSchemaEvalution(schemaEvol)
-    tableInfo.setDatabaseName(cm.databaseName)
-    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(cm.databaseName, cm.tableName))
+    tableInfo.setTablePath(tablePath)
+    tableInfo.setDatabaseName(dbName)
+    tableInfo.setTableUniqueName(CarbonTable.buildUniqueName(dbName, tableName))
     tableInfo.setLastUpdatedTime(System.currentTimeMillis())
     tableInfo.setFactTable(tableSchema)
     tableInfo

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
index 76b9e30..6864495 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/util/CarbonReflectionUtils.scala
@@ -54,7 +54,8 @@ object CarbonReflectionUtils {
       .map(l => im.reflectField(l.asTerm).get).getOrElse(null)
   }
 
-  def getUnresolvedRelation(tableIdentifier: TableIdentifier,
+  def getUnresolvedRelation(
+      tableIdentifier: TableIdentifier,
       version: String,
       tableAlias: Option[String] = None): UnresolvedRelation = {
     val className = "org.apache.spark.sql.catalyst.analysis.UnresolvedRelation"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 145068f..f1b9ecd 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -286,21 +286,17 @@ object CarbonDataRDDFactory {
       hadoopConf: Configuration,
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
-    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-    val operationContext = new OperationContext
-    // for handling of the segment Merging.
-
     LOGGER.audit(s"Data load request has been received for table" +
                  s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
     // Check if any load need to be deleted before loading new data
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     DataManagementFunc.deleteLoadsAndUpdateMetadata(isForceDeletion = false, carbonTable)
     var status: Array[(String, (LoadMetadataDetails, ExecutionErrors))] = null
     var res: Array[List[(String, (LoadMetadataDetails, ExecutionErrors))]] = null
 
     // create new segment folder  in carbon store
     if (updateModel.isEmpty) {
-      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
-        carbonLoadModel.getSegmentId, carbonTable)
+      CarbonLoaderUtil.checkAndCreateCarbonDataLocation(carbonLoadModel.getSegmentId, carbonTable)
     }
     var loadStatus = SegmentStatus.SUCCESS
     var errorMessage: String = "DataLoad failure"
@@ -769,7 +765,7 @@ object CarbonDataRDDFactory {
       true)
     CarbonUtil
       .addDataIndexSizeIntoMetaEntry(metadataDetails, carbonLoadModel.getSegmentId, carbonTable)
-    val done = CarbonLoaderUtil.recordLoadMetadata(metadataDetails, carbonLoadModel, false,
+    val done = CarbonLoaderUtil.recordNewLoadMetadata(metadataDetails, carbonLoadModel, false,
       overwriteTable)
     if (!done) {
       val errorMessage = "Dataload failed due to failure in table status updation."

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
index 4f2cf14..d433470 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCatalystOperators.scala
@@ -70,48 +70,6 @@ case class IncludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attr
 
 case class ExcludeProfile(attributes: Seq[Attribute]) extends CarbonProfile(attributes)
 
-object GetDB {
-
-  def getDatabaseName(dbName: Option[String], sparkSession: SparkSession): String = {
-    dbName.getOrElse(
-      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getCurrentDatabase)
-  }
-
-  /**
-   * The method returns the database location
-   * if carbon.storeLocation does  point to spark.sql.warehouse.dir then returns
-   * the database locationUri as database location else follows the old behaviour
-   * making database location from carbon fixed store and database name.
-   *
-   * @param dbName
-   * @param sparkSession
-   * @param fixedStorePath
-   * @return
-   */
-  def getDatabaseLocation(dbName: String, sparkSession: SparkSession,
-      fixedStorePath: String): String = {
-    var databaseLocation =
-      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
-        .locationUri.toString
-    // for default database and db ends with .db
-    // check whether the carbon store and hive store is same or different.
-    if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
-      val properties = CarbonProperties.getInstance()
-      val carbonStorePath = FileFactory
-        .getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
-      val hiveStorePath = FileFactory
-        .getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
-      // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
-      // format
-      if (!hiveStorePath.equals(carbonStorePath)) {
-        databaseLocation = fixedStorePath + CarbonCommonConstants.FILE_SEPARATOR + dbName
-      }
-    }
-
-    return FileFactory.getUpdatedFilePath(databaseLocation)
-  }
-}
-
 case class ProjectForUpdate(
     table: UnresolvedRelation,
     columns: List[String],

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
index 71590dd..ca371e1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala
@@ -63,9 +63,13 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
     val storePath = CarbonProperties.getStorePath
     val tempCSVFolder = new StringBuilder(storePath).append(CarbonCommonConstants.FILE_SEPARATOR)
       .append("tempCSV")
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.dbName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(options.tableName)
-      .append(CarbonCommonConstants.UNDERSCORE).append(System.nanoTime()).toString
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession))
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(options.tableName)
+      .append(CarbonCommonConstants.UNDERSCORE)
+      .append(System.nanoTime())
+      .toString
     writeToTempCSVFile(tempCSVFolder, options)
 
     val tempCSVPath = new Path(tempCSVFolder)
@@ -133,7 +137,7 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
   private def loadDataFrame(options: CarbonOption): Unit = {
     val header = dataFrame.columns.mkString(",")
     CarbonLoadDataCommand(
-      Some(options.dbName),
+      Some(CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)),
       options.tableName,
       null,
       Seq(),
@@ -168,18 +172,21 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) {
       "DICTIONARY_EXCLUDE" -> options.dictionaryExclude,
       "TABLE_BLOCKSIZE" -> options.tableBlockSize
     ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",")
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
     s"""
-       | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName}
+       | CREATE TABLE IF NOT EXISTS $dbName.${options.tableName}
        | (${ carbonSchema.mkString(", ") })
        | STORED BY 'carbondata'
        | ${ if (property.nonEmpty) "TBLPROPERTIES (" + property + ")" else "" }
+       | ${ if (options.tablePath.nonEmpty) s"LOCATION '${options.tablePath.get}'" else ""}
      """.stripMargin
   }
 
   private def makeLoadString(csvFolder: String, options: CarbonOption): String = {
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
     s"""
        | LOAD DATA INPATH '$csvFolder'
-       | INTO TABLE ${options.dbName}.${options.tableName}
+       | INTO TABLE $dbName.${options.tableName}
        | OPTIONS ('FILEHEADER' = '${dataFrame.columns.mkString(",")}',
        | 'SINGLE_PASS' = '${options.singlePass}')
      """.stripMargin

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 31ff323..57233cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -46,9 +46,10 @@ case class CarbonDatasourceHadoopRelation(
   extends BaseRelation with InsertableRelation {
 
   var caseInsensitiveMap = parameters.map(f => (f._1.toLowerCase, f._2))
-  lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(paths.head,
-    caseInsensitiveMap.getOrElse("dbname", GetDB.getDatabaseName(None, sparkSession)),
-    caseInsensitiveMap.get("tablename").get)
+  lazy val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
+    paths.head,
+    CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession),
+    caseInsensitiveMap("tablename"))
   lazy val databaseName: String = carbonTable.getDatabaseName
   lazy val tableName: String = carbonTable.getTableName
   CarbonSession.updateSessionInfoToCurrentThread(sparkSession)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index 1a336c4..771a235 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -504,9 +504,9 @@ class CarbonDecoderRDD(
   }
 
   override def internalCompute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
-    val absoluteTableIdentifiers = relations.map { relation =>
-      val tableInfo = getTableInfo
-      (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier)
+    val tableInfo = getTableInfo
+    val absoluteTableIdentifiers = relations.map { _ =>
+      (tableInfo.getFactTable.getTableName, tableInfo.getOrCreateAbsoluteTableIdentifier())
     }.toMap
 
     val cacheProvider: CacheProvider = CacheProvider.getInstance

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
index c03b210..811442b 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonEnv.scala
@@ -17,18 +17,18 @@
 
 package org.apache.spark.sql
 
-import java.util.Map
 import java.util.concurrent.ConcurrentHashMap
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.hive.{CarbonMetaStore, CarbonMetaStoreFactory, CarbonRelation, CarbonSessionCatalog, CarbonSQLConf}
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
+import org.apache.spark.sql.hive._
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.util._
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.events.{CarbonEnvInitPreEvent, OperationContext, OperationListenerBus}
 import org.apache.carbondata.spark.rdd.SparkReadSupport
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
@@ -99,8 +99,7 @@ class CarbonEnv {
 
 object CarbonEnv {
 
-  val carbonEnvMap: Map[SparkSession, CarbonEnv] =
-    new ConcurrentHashMap[SparkSession, CarbonEnv]
+  val carbonEnvMap = new ConcurrentHashMap[SparkSession, CarbonEnv]
 
   def getInstance(sparkSession: SparkSession): CarbonEnv = {
     if (sparkSession.isInstanceOf[CarbonSession]) {
@@ -117,18 +116,24 @@ object CarbonEnv {
   }
 
   /**
-   * Return carbon table instance by looking up table in `sparkSession`
+   * Return carbon table instance from cache or by looking up table in `sparkSession`
    */
   def getCarbonTable(
       databaseNameOp: Option[String],
       tableName: String)
     (sparkSession: SparkSession): CarbonTable = {
-    CarbonEnv
-      .getInstance(sparkSession)
-      .carbonMetastore
-      .lookupRelation(databaseNameOp, tableName)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
+    val databaseName = getDatabaseName(databaseNameOp)(sparkSession)
+    val catalog = getInstance(sparkSession).carbonMetastore
+    // refresh cache
+    catalog.checkSchemasModifiedTimeAndReloadTables()
+
+    // try to get it from catch, otherwise lookup in catalog
+    catalog.getTableFromMetadataCache(databaseName, tableName)
+      .getOrElse(
+        catalog
+          .lookupRelation(databaseNameOp, tableName)(sparkSession)
+          .asInstanceOf[CarbonRelation]
+          .carbonTable)
   }
 
   /**
@@ -137,12 +142,7 @@ object CarbonEnv {
   def getCarbonTable(
       tableIdentifier: TableIdentifier)
     (sparkSession: SparkSession): CarbonTable = {
-    CarbonEnv
-      .getInstance(sparkSession)
-      .carbonMetastore
-      .lookupRelation(tableIdentifier)(sparkSession)
-      .asInstanceOf[CarbonRelation]
-      .carbonTable
+    getCarbonTable(tableIdentifier.database, tableIdentifier.table)(sparkSession)
   }
 
   /**
@@ -155,32 +155,62 @@ object CarbonEnv {
   }
 
   /**
-   * Return table path for specified table
+   * The method returns the database location
+   * if carbon.storeLocation does  point to spark.sql.warehouse.dir then returns
+   * the database locationUri as database location else follows the old behaviour
+   * making database location from carbon fixed store and database name.
+   * @return database location
+   */
+  def getDatabaseLocation(dbName: String, sparkSession: SparkSession): String = {
+    var databaseLocation =
+      sparkSession.sessionState.catalog.asInstanceOf[HiveSessionCatalog].getDatabaseMetadata(dbName)
+        .locationUri.toString
+    // for default database and db ends with .db
+    // check whether the carbon store and hive store is same or different.
+    if (dbName.equals("default") || databaseLocation.endsWith(".db")) {
+      val properties = CarbonProperties.getInstance()
+      val carbonStorePath =
+        FileFactory.getUpdatedFilePath(properties.getProperty(CarbonCommonConstants.STORE_LOCATION))
+      val hiveStorePath =
+        FileFactory.getUpdatedFilePath(sparkSession.conf.get("spark.sql.warehouse.dir"))
+      // if carbon.store does not point to spark.sql.warehouse.dir then follow the old table path
+      // format
+      if (!hiveStorePath.equals(carbonStorePath)) {
+        databaseLocation = CarbonProperties.getStorePath +
+                           CarbonCommonConstants.FILE_SEPARATOR +
+                           dbName
+      }
+    }
+
+    FileFactory.getUpdatedFilePath(databaseLocation)
+  }
+
+  /**
+   * Return table path from carbon table. If table does not exist, construct it using
+   * database location and table name
    */
   def getTablePath(
       databaseNameOp: Option[String],
       tableName: String
   )(sparkSession: SparkSession): String = {
-    val dbLocation = GetDB.getDatabaseLocation(
-      databaseNameOp.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase),
-      sparkSession,
-      CarbonProperties.getStorePath)
-    dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    try {
+      getCarbonTable(databaseNameOp, tableName)(sparkSession).getTablePath
+    } catch {
+      case _: NoSuchTableException =>
+        val dbName = getDatabaseName(databaseNameOp)(sparkSession)
+        val dbLocation = getDatabaseLocation(dbName, sparkSession)
+        dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    }
   }
 
-  /**
-   * Return metadata path for specified table
-   */
-  def getMetadataPath(
+  def getIdentifier(
       databaseNameOp: Option[String],
       tableName: String
-  )(sparkSession: SparkSession): String = {
-    val absoluteTableIdentifier = AbsoluteTableIdentifier.from(
+  )(sparkSession: SparkSession): AbsoluteTableIdentifier = {
+    AbsoluteTableIdentifier.from(
       getTablePath(databaseNameOp, tableName)(sparkSession),
       getDatabaseName(databaseNameOp)(sparkSession),
       tableName)
-    val carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier)
-    val schemaFilePath = carbonTablePath.getSchemaFilePath
-    CarbonTablePath.getFolderContainingFile(schemaFilePath)
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
index b764132..7fb146c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonSource.scala
@@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.schema.SchemaEvolutionEntry
 import org.apache.carbondata.core.metadata.schema.table.TableInfo
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.spark.CarbonOption
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.streaming.{CarbonStreamException, StreamSinkFactory}
@@ -67,9 +66,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
         None)
       case _ =>
         val options = new CarbonOption(parameters)
-        val storePath = CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.STORE_LOCATION)
-        val tablePath = storePath + "/" + options.dbName + "/" + options.tableName
+        val tablePath =
+          CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession)
         CarbonDatasourceHadoopRelation(sqlContext.sparkSession, Array(tablePath), parameters, None)
     }
   }
@@ -88,16 +86,17 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
                                           "specified when creating CarbonContext")
 
     val options = new CarbonOption(parameters)
-    val storePath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION)
-    val tablePath = new Path(storePath + "/" + options.dbName + "/" + options.tableName)
+    val tablePath = new Path(
+      CarbonEnv.getTablePath(options.dbName, options.tableName)(sqlContext.sparkSession))
     val isExists = tablePath.getFileSystem(sqlContext.sparkContext.hadoopConfiguration)
       .exists(tablePath)
     val (doSave, doAppend) = (mode, isExists) match {
       case (SaveMode.ErrorIfExists, true) =>
-        CarbonException.analysisException(s"path $storePath already exists.")
+        CarbonException.analysisException(s"table path already exists.")
       case (SaveMode.Overwrite, true) =>
+        val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
         sqlContext.sparkSession
-          .sql(s"DROP TABLE IF EXISTS ${ options.dbName }.${ options.tableName }")
+          .sql(s"DROP TABLE IF EXISTS $dbName.${options.tableName}")
         (true, false)
       case (SaveMode.Overwrite, false) | (SaveMode.ErrorIfExists, false) =>
         (true, false)
@@ -124,8 +123,8 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       dataSchema: StructType): BaseRelation = {
     CarbonEnv.getInstance(sqlContext.sparkSession)
     addLateDecodeOptimization(sqlContext.sparkSession)
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val dbName: String =
+      CarbonEnv.getDatabaseName(parameters.get("dbName"))(sqlContext.sparkSession)
     val tableOption: Option[String] = parameters.get("tableName")
     if (tableOption.isEmpty) {
       CarbonException.analysisException("Table creation failed. Table name is not specified")
@@ -154,27 +153,27 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
   }
 
 
-  private def createTableIfNotExists(sparkSession: SparkSession, parameters: Map[String, String],
-      dataSchema: StructType) = {
+  private def createTableIfNotExists(
+      sparkSession: SparkSession,
+      parameters: Map[String, String],
+      dataSchema: StructType): (String, Map[String, String]) = {
 
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
+    val dbName: String = CarbonEnv.getDatabaseName(parameters.get("dbName"))(sparkSession)
     val tableName: String = parameters.getOrElse("tableName", "").toLowerCase
 
     try {
-      if (parameters.contains("carbonSchemaPartsNo")) {
-        getPathForTable(sparkSession, dbName, tableName, parameters)
-      } else {
-        CarbonEnv.getInstance(sparkSession).carbonMetastore
-          .lookupRelation(Option(dbName), tableName)(sparkSession)
-        (CarbonProperties.getStorePath + s"/$dbName/$tableName", parameters)
-      }
+      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+      (carbonTable.getTablePath, parameters)
     } catch {
-      case ex: NoSuchTableException =>
+      case _: NoSuchTableException =>
         val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
-        val updatedParams =
-          CarbonSource.updateAndCreateTable(dataSchema, sparkSession, metaStore, parameters)
-        getPathForTable(sparkSession, dbName, tableName, updatedParams)
+        val identifier = AbsoluteTableIdentifier.from(
+          CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession),
+          dbName,
+          tableName)
+        val updatedParams = CarbonSource.updateAndCreateTable(
+          identifier, dataSchema, sparkSession, metaStore, parameters)
+        (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), updatedParams)
       case ex: Exception =>
         throw new Exception("do not have dbname and tablename for carbon table", ex)
     }
@@ -203,8 +202,7 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       } else if (!sparkSession.isInstanceOf[CarbonSession]) {
         (CarbonProperties.getStorePath + "/" + dbName + "/" + tableName, parameters)
       } else {
-        val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-        (carbonTable.getTablePath, parameters)
+        (CarbonEnv.getTablePath(Some(dbName), tableName)(sparkSession), parameters)
       }
     } catch {
       case ex: Exception =>
@@ -222,54 +220,44 @@ class CarbonSource extends CreatableRelationProvider with RelationProvider
       outputMode: OutputMode): Sink = {
 
     // check "tablePath" option
-    val tablePathOption = parameters.get("tablePath")
-    val dbName: String = parameters.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableOption: Option[String] = parameters.get("tableName")
-    if (tableOption.isEmpty) {
-      throw new CarbonStreamException("Table creation failed. Table name is not specified")
-    }
-    val tableName = tableOption.get.toLowerCase()
+    val options = new CarbonOption(parameters)
+    val dbName = CarbonEnv.getDatabaseName(options.dbName)(sqlContext.sparkSession)
+    val tableName = options.tableName
     if (tableName.contains(" ")) {
       throw new CarbonStreamException("Table creation failed. Table name cannot contain blank " +
                                       "space")
     }
-    if (tablePathOption.isDefined) {
-      val sparkSession = sqlContext.sparkSession
-      val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
-
-      if (!carbonTable.isStreamingTable) {
-        throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
-                                        s"${carbonTable.getTableName} is not a streaming table")
-      }
-
-      // create sink
-      StreamSinkFactory.createStreamTableSink(
-        sqlContext.sparkSession,
-        sqlContext.sparkSession.sessionState.newHadoopConf(),
-        carbonTable,
-        parameters)
-    } else {
-      throw new CarbonStreamException("Require tablePath option for the write stream")
+    val sparkSession = sqlContext.sparkSession
+    val carbonTable = CarbonEnv.getCarbonTable(Some(dbName), tableName)(sparkSession)
+    if (!carbonTable.isStreamingTable) {
+      throw new CarbonStreamException(s"Table ${carbonTable.getDatabaseName}." +
+                                      s"${carbonTable.getTableName} is not a streaming table")
     }
+
+    // create sink
+    StreamSinkFactory.createStreamTableSink(
+      sqlContext.sparkSession,
+      sqlContext.sparkSession.sessionState.newHadoopConf(),
+      carbonTable,
+      parameters)
   }
 
 }
 
 object CarbonSource {
 
-  def createTableInfoFromParams(parameters: Map[String, String],
+  def createTableInfoFromParams(
+      parameters: Map[String, String],
       dataSchema: StructType,
-      dbName: String,
-      tableName: String): TableModel = {
+      identifier: AbsoluteTableIdentifier): TableModel = {
     val sqlParser = new CarbonSpark2SqlParser
     val fields = sqlParser.getFields(dataSchema)
     val map = scala.collection.mutable.Map[String, String]()
     parameters.foreach { case (key, value) => map.put(key, value.toLowerCase()) }
     val options = new CarbonOption(parameters)
     val bucketFields = sqlParser.getBucketFields(map, fields, options)
-    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(dbName),
-      tableName, fields, Nil, map, bucketFields)
+    sqlParser.prepareTableModel(ifNotExistPresent = false, Option(identifier.getDatabaseName),
+      identifier.getTableName, fields, Nil, map, bucketFields)
   }
 
   /**
@@ -278,13 +266,23 @@ object CarbonSource {
    * @param sparkSession
    * @return
    */
-  def updateCatalogTableWithCarbonSchema(tableDesc: CatalogTable,
+  def updateCatalogTableWithCarbonSchema(
+      tableDesc: CatalogTable,
       sparkSession: SparkSession): CatalogTable = {
     val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetastore
     val storageFormat = tableDesc.storage
     val properties = storageFormat.properties
     if (!properties.contains("carbonSchemaPartsNo")) {
-      val map = updateAndCreateTable(tableDesc.schema, sparkSession, metaStore, properties)
+      val tablePath = CarbonEnv.getTablePath(
+        tableDesc.identifier.database, tableDesc.identifier.table)(sparkSession)
+      val dbName = CarbonEnv.getDatabaseName(tableDesc.identifier.database)(sparkSession)
+      val identifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableDesc.identifier.table)
+      val map = updateAndCreateTable(
+        identifier,
+        tableDesc.schema,
+        sparkSession,
+        metaStore,
+        properties)
       // updating params
       val updatedFormat = storageFormat.copy(properties = map)
       tableDesc.copy(storage = updatedFormat)
@@ -292,7 +290,7 @@ object CarbonSource {
       val tableInfo = CarbonUtil.convertGsonToTableInfo(properties.asJava)
       if (!metaStore.isReadFromHiveMetaStore) {
         // save to disk
-        metaStore.saveToDisk(tableInfo, properties.get("tablePath").get)
+        metaStore.saveToDisk(tableInfo, properties("tablePath"))
         // remove schema string from map as we don't store carbon schema to hive metastore
         val map = CarbonUtil.removeSchemaFromMap(properties.asJava)
         val updatedFormat = storageFormat.copy(properties = map.asScala.toMap)
@@ -303,36 +301,26 @@ object CarbonSource {
     }
   }
 
-  def updateAndCreateTable(dataSchema: StructType,
+  def updateAndCreateTable(
+      identifier: AbsoluteTableIdentifier,
+      dataSchema: StructType,
       sparkSession: SparkSession,
       metaStore: CarbonMetaStore,
       properties: Map[String, String]): Map[String, String] = {
-    val dbName: String = properties.getOrElse("dbName",
-      CarbonCommonConstants.DATABASE_DEFAULT_NAME).toLowerCase
-    val tableName: String = properties.getOrElse("tableName", "").toLowerCase
-    val model = createTableInfoFromParams(properties, dataSchema, dbName, tableName)
-    val tableInfo: TableInfo = TableNewProcessor(model)
-    val dbLocation = GetDB.getDatabaseLocation(dbName, sparkSession, CarbonProperties.getStorePath)
-    val tablePath = dbLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    val model = createTableInfoFromParams(properties, dataSchema, identifier)
+    val tableInfo: TableInfo = TableNewProcessor(model, identifier)
     val schemaEvolutionEntry = new SchemaEvolutionEntry
     schemaEvolutionEntry.setTimeStamp(tableInfo.getLastUpdatedTime)
-    tableInfo.getFactTable.getSchemaEvalution.
-      getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
+    tableInfo.getFactTable.getSchemaEvalution.getSchemaEvolutionEntryList.add(schemaEvolutionEntry)
     val map = if (metaStore.isReadFromHiveMetaStore) {
-      val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
-      val carbonTablePath = CarbonStorePath.getCarbonTablePath(tableIdentifier)
-      val schemaMetadataPath =
-        CarbonTablePath.getFolderContainingFile(carbonTablePath.getSchemaFilePath)
-      tableInfo.setMetaDataFilepath(schemaMetadataPath)
-      tableInfo.setTablePath(tableIdentifier.getTablePath)
       CarbonUtil.convertToMultiStringMap(tableInfo)
     } else {
-      metaStore.saveToDisk(tableInfo, tablePath)
+      metaStore.saveToDisk(tableInfo, identifier.getTablePath)
       new java.util.HashMap[String, String]()
     }
     properties.foreach(e => map.put(e._1, e._2))
-    map.put("tablePath", tablePath)
-    map.put("dbname", dbName)
+    map.put("tablepath", identifier.getTablePath)
+    map.put("dbname", identifier.getDatabaseName)
     map.asScala.toMap
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index 622bf0a..f90abb8 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -60,7 +60,7 @@ case class CarbonCreateDataMapCommand(
     } else {
       val dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
       dataMapSchema.setProperties(new java.util.HashMap[String, String](dmproperties.asJava))
-      val dbName = GetDB.getDatabaseName(tableIdentifier.database, sparkSession)
+      val dbName = CarbonEnv.getDatabaseName(tableIdentifier.database)(sparkSession)
       // upadting the parent table about dataschema
       PreAggregateUtil.updateMainTable(dbName, tableIdentifier.table, dataMapSchema, sparkSession)
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
index 46c803d..9a71523 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDropDataMapCommand.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.command.datamap
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.execution.command.AtomicRunnableCommand
@@ -54,14 +54,12 @@ case class CarbonDropDataMapCommand(
 
   override def processMetadata(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val identifier = TableIdentifier(tableName, Option(dbName))
     val locksToBeAcquired = List(LockUsage.METADATA_LOCK)
     val carbonEnv = CarbonEnv.getInstance(sparkSession)
     val catalog = carbonEnv.carbonMetastore
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
+    val tablePath = CarbonEnv.getTablePath(databaseNameOp, tableName)(sparkSession)
     val tableIdentifier =
       AbsoluteTableIdentifier.from(tablePath, dbName.toLowerCase, tableName.toLowerCase)
     catalog.checkSchemasModifiedTimeAndReloadTables()
@@ -136,11 +134,7 @@ case class CarbonDropDataMapCommand(
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     // delete the table folder
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
-    val tablePath = databaseLocation + CarbonCommonConstants.FILE_SEPARATOR + tableName.toLowerCase
-    val tableIdentifier = AbsoluteTableIdentifier.from(tablePath, dbName, tableName)
+    val tableIdentifier = CarbonEnv.getIdentifier(databaseNameOp, tableName)(sparkSession)
     DataMapStoreManager.getInstance().clearDataMap(tableIdentifier, dataMapName)
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
index 45f99fd..d37ca0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/DataMapListeners.scala
@@ -32,9 +32,9 @@ object DataMapDropTablePostListener extends OperationEventListener {
     val dropPostEvent = event.asInstanceOf[DropTablePostEvent]
     val carbonTable = dropPostEvent.carbonTable
     val sparkSession = dropPostEvent.sparkSession
-    if (carbonTable.isDefined && carbonTable.get.hasDataMapSchema) {
+    if (carbonTable.hasDataMapSchema) {
       // drop all child tables
-      val childSchemas = carbonTable.get.getTableInfo.getDataMapSchemaList
+      val childSchemas = carbonTable.getTableInfo.getDataMapSchemaList
       childSchemas.asScala
         .filter(_.getRelationIdentifier != null)
         .foreach { childSchema =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
index 0011395..eacfded 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala
@@ -17,14 +17,12 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
-import org.apache.spark.sql.execution.command.{Checker, DataCommand, DataProcessOperation, RunnableCommand}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus}
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Clean data in table
@@ -65,9 +63,8 @@ case class CarbonCleanFilesCommand(
 
   private def deleteAllData(sparkSession: SparkSession,
       databaseNameOp: Option[String], tableName: String): Unit = {
-    val dbName = GetDB.getDatabaseName(databaseNameOp, sparkSession)
-    val databaseLocation = GetDB.getDatabaseLocation(dbName, sparkSession,
-      CarbonProperties.getStorePath)
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+    val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, sparkSession)
     CarbonStore.cleanFiles(
       dbName,
       tableName,
@@ -85,16 +82,14 @@ case class CarbonCleanFilesCommand(
     OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
 
     CarbonStore.cleanFiles(
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       CarbonProperties.getStorePath,
       carbonTable,
       forceTableClean)
 
-    val cleanFilesPostEvent: CleanFilesPostEvent =
-      CleanFilesPostEvent(carbonTable,
-        sparkSession)
-    OperationListenerBus.getInstance.fireEvent(cleanFilesPreEvent)
+    val cleanFilesPostEvent: CleanFilesPostEvent = CleanFilesPostEvent(carbonTable, sparkSession)
+    OperationListenerBus.getInstance.fireEvent(cleanFilesPostEvent)
   }
 
   private def cleanGarbageDataInAllTables(sparkSession: SparkSession): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
index 06eb657..a2819cc 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByIdCommand(
 
     CarbonStore.deleteLoadById(
       loadIds,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       carbonTable
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
index dbfb030..490bb58 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 
 import org.apache.carbondata.api.CarbonStore
@@ -42,7 +42,7 @@ case class CarbonDeleteLoadByLoadDateCommand(
 
     CarbonStore.deleteLoadByDate(
       loadDate,
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
+      CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession),
       tableName,
       carbonTable)
     val deleteSegmentPostEvent: DeleteSegmentByDatePostEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 9d21468..ff13299 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configuration
 import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, UnresolvedAttribute}
-import org.apache.spark.sql.execution.command.{DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
+import org.apache.spark.sql.execution.command.{DataCommand, DataLoadTableFileMapping, DataProcessOperation, RunnableCommand, UpdateTableModel}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.util.{CausedBy, FileUtils}
 
@@ -59,11 +59,7 @@ case class CarbonLoadDataCommand(
     dataFrame: Option[DataFrame] = None,
     updateModel: Option[UpdateTableModel] = None,
     var tableInfoOp: Option[TableInfo] = None)
-  extends RunnableCommand with DataProcessOperation {
-
-  override def run(sparkSession: SparkSession): Seq[Row] = {
-    processData(sparkSession)
-  }
+  extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
     val LOGGER: LogService = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -75,8 +71,6 @@ case class CarbonLoadDataCommand(
       }
     }
 
-    val dbName = databaseNameOp.getOrElse(sparkSession.catalog.currentDatabase)
-
     val carbonProperty: CarbonProperties = CarbonProperties.getInstance()
     carbonProperty.addProperty("zookeeper.enable.lock", "false")
 
@@ -99,6 +93,7 @@ case class CarbonLoadDataCommand(
     // update the property with new value
     carbonProperty.addProperty(CarbonCommonConstants.NUM_CORES_LOADING, numCoresLoading)
 
+    val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
     val hadoopConf = sparkSession.sessionState.newHadoopConf()
     try {
       val table = if (tableInfoOp.isDefined) {
@@ -218,8 +213,7 @@ case class CarbonLoadDataCommand(
                                   table.getTableName + "/"
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
-            val file = FileFactory
-              .getCarbonFile(partitionLocation, fileType)
+            val file = FileFactory.getCarbonFile(partitionLocation, fileType)
             CarbonUtil.deleteFoldersAndFiles(file)
           }
         } catch {
@@ -267,7 +261,6 @@ case class CarbonLoadDataCommand(
         dimensions,
         carbonLoadModel,
         sparkSession.sqlContext,
-        carbonLoadModel.getTablePath,
         dictFolderPath)
     }
     if (!StringUtils.isEmpty(carbonLoadModel.getAllDictPath)) {
@@ -275,7 +268,6 @@ case class CarbonLoadDataCommand(
       GlobalDictionaryUtil
         .generateDictionaryFromDictionaryFiles(sparkSession.sqlContext,
           carbonLoadModel,
-          carbonLoadModel.getTablePath,
           carbonTableIdentifier,
           dictFolderPath,
           dimensions,
@@ -334,10 +326,11 @@ case class CarbonLoadDataCommand(
       val getSegIdUDF = udf((tupleId: String) =>
         CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID))
       // getting all fields except tupleId field as it is not required in the value
-      var otherFields = fields.toSeq
-        .filter(field => !field.name
-          .equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID))
-        .map(field => new Column(field.name))
+      var otherFields = fields.toSeq.filter { field =>
+        !field.name.equalsIgnoreCase(CarbonCommonConstants.CARBON_IMPLICIT_COLUMN_TUPLEID)
+      }.map { field =>
+        new Column(field.name)
+      }
 
       // extract tupleId field which will be used as a key
       val segIdColumn = getSegIdUDF(new Column(UnresolvedAttribute
@@ -356,10 +349,10 @@ case class CarbonLoadDataCommand(
     GlobalDictionaryUtil.generateGlobalDictionary(
       sparkSession.sqlContext,
       carbonLoadModel,
-      carbonLoadModel.getTablePath,
       hadoopConf,
       dictionaryDataFrame)
-    CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
+    CarbonDataRDDFactory.loadCarbonData(
+      sparkSession.sqlContext,
       carbonLoadModel,
       carbonLoadModel.getTablePath,
       columnar,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
index c6898b2..e0311cb 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonShowLoadsCommand.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.command.management
 
-import org.apache.spark.sql.{CarbonEnv, GetDB, Row, SparkSession}
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.{StringType, TimestampType}
@@ -43,8 +43,6 @@ case class CarbonShowLoadsCommand(
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
     val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
     CarbonStore.showSegments(
-      GetDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName,
       limit,
       carbonTable.getMetaDataFilepath
     )

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index 3d65862..ecc48cf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation
 import org.apache.spark.sql.{CarbonEnv, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.hive.CarbonRelation
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage}
@@ -34,7 +33,8 @@ import org.apache.carbondata.processing.loading.FailureCauses
  */
 private[sql] case class CarbonProjectForDeleteCommand(
     plan: LogicalPlan,
-    identifier: Seq[String],
+    databaseNameOp: Option[String],
+    tableName: String,
     timestamp: String)
   extends DataCommand {
 
@@ -44,10 +44,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
     val dataFrame = Dataset.ofRows(sparkSession, plan)
     val dataRdd = dataFrame.rdd
 
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(identifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
     // trigger event for Delete from table
     val operationContext = new OperationContext
@@ -62,7 +59,7 @@ private[sql] case class CarbonProjectForDeleteCommand(
     try {
       lockStatus = metadataLock.lockWithRetries()
       LOGGER.audit(s" Delete data request has been received " +
-                   s"for ${ relation.databaseName }.${ relation.tableName }.")
+                   s"for ${carbonTable.getDatabaseName}.${carbonTable.getTableName}.")
       if (lockStatus) {
         LOGGER.info("Successfully able to get the table metadata file lock")
       } else {
@@ -73,10 +70,16 @@ private[sql] case class CarbonProjectForDeleteCommand(
       // handle the clean up of IUD.
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
-      if (DeleteExecution.deleteDeltaExecution(identifier, sparkSession, dataRdd, timestamp,
-        isUpdateOperation = false, executorErrors)) {
+      if (DeleteExecution.deleteDeltaExecution(
+        databaseNameOp,
+        tableName,
+        sparkSession,
+        dataRdd,
+        timestamp,
+        isUpdateOperation = false,
+        executorErrors)) {
         // call IUD Compaction.
-        HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation,
+        HorizontalCompaction.tryHorizontalCompaction(sparkSession, carbonTable,
           isUpdateOperation = false)
 
         // trigger post event for Delete from table

http://git-wip-us.apache.org/repos/asf/carbondata/blob/2fe7758b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
index 3aadec3..75008ad 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala
@@ -22,7 +22,6 @@ import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.storage.StorageLevel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
@@ -35,11 +34,12 @@ import org.apache.carbondata.processing.loading.FailureCauses
 
 private[sql] case class CarbonProjectForUpdateCommand(
     plan: LogicalPlan,
-    tableIdentifier: Seq[String])
+    databaseNameOp: Option[String],
+    tableName: String)
   extends DataCommand {
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
-    val LOGGER = LogServiceFactory.getLogService(CarbonProjectForUpdateCommand.getClass.getName)
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     IUDCommonUtil.checkIfSegmentListIsSet(sparkSession, plan)
     val res = plan find {
       case relation: LogicalRelation if relation.relation
@@ -51,10 +51,7 @@ private[sql] case class CarbonProjectForUpdateCommand(
     if (res.isEmpty) {
       return Seq.empty
     }
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
-      .lookupRelation(DeleteExecution.getTableIdentifier(tableIdentifier))(sparkSession).
-      asInstanceOf[CarbonRelation]
-    val carbonTable = relation.carbonTable
+    val carbonTable = CarbonEnv.getCarbonTable(databaseNameOp, tableName)(sparkSession)
 
     // trigger event for Update table
     val operationContext = new OperationContext
@@ -94,22 +91,35 @@ private[sql] case class CarbonProjectForUpdateCommand(
       CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, false)
 
       // do delete operation.
-      DeleteExecution.deleteDeltaExecution(tableIdentifier, sparkSession, dataSet.rdd,
-        currentTime + "", isUpdateOperation = true, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
+      DeleteExecution.deleteDeltaExecution(
+        databaseNameOp,
+        tableName,
+        sparkSession,
+        dataSet.rdd,
+        currentTime + "",
+        isUpdateOperation = true,
+        executionErrors)
+
+      if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
       }
 
       // do update operation.
-      performUpdate(dataSet, tableIdentifier, plan, sparkSession, currentTime, executionErrors)
-
-      if(executionErrors.failureCauses != FailureCauses.NONE) {
+      performUpdate(dataSet,
+        databaseNameOp,
+        tableName,
+        plan,
+        sparkSession,
+        currentTime,
+        executionErrors)
+
+      if (executionErrors.failureCauses != FailureCauses.NONE) {
         throw new Exception(executionErrors.errorMsg)
       }
 
       // Do IUD Compaction.
-      HorizontalCompaction.tryHorizontalCompaction(sparkSession, relation, isUpdateOperation = true)
+      HorizontalCompaction.tryHorizontalCompaction(
+        sparkSession, carbonTable, isUpdateOperation = true)
 
       // trigger event for Update table
       val updateTablePostEvent: UpdateTablePostEvent =
@@ -150,21 +160,21 @@ private[sql] case class CarbonProjectForUpdateCommand(
 
   private def performUpdate(
       dataFrame: Dataset[Row],
-      tableIdentifier: Seq[String],
+      databaseNameOp: Option[String],
+      tableName: String,
       plan: LogicalPlan,
       sparkSession: SparkSession,
       currentTime: Long,
       executorErrors: ExecutionErrors): Unit = {
 
     def isDestinationRelation(relation: CarbonDatasourceHadoopRelation): Boolean = {
-
-      val tableName = relation.identifier.getCarbonTableIdentifier.getTableName
-      val dbName = relation.identifier.getCarbonTableIdentifier.getDatabaseName
-      (tableIdentifier.size > 1 &&
-       tableIdentifier(0) == dbName &&
-       tableIdentifier(1) == tableName) ||
-      (tableIdentifier(0) == tableName)
+      val dbName = CarbonEnv.getDatabaseName(databaseNameOp)(sparkSession)
+      (databaseNameOp.isDefined &&
+       databaseNameOp.get == dbName &&
+       tableName == relation.identifier.getCarbonTableIdentifier.getTableName) ||
+      (tableName == relation.identifier.getCarbonTableIdentifier.getTableName)
     }
+
     def getHeader(relation: CarbonDatasourceHadoopRelation, plan: LogicalPlan): String = {
       var header = ""
       var found = false