You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ma...@apache.org on 2018/10/30 08:45:36 UTC

carbondata git commit: [CARBONDATA-3042] Column Schema objects are present in Driver and Executor even after dropping table

Repository: carbondata
Updated Branches:
  refs/heads/master e2c517e3f -> 10b393808


[CARBONDATA-3042] Column Schema objects are present in Driver and Executor even after dropping table

Problem:
Column Schema objects are present in Driver and Executor even after dropping table.

Solution:
In Driver: After dropping table, remove entry of tableInfo from CarbonMetaDataInstance.
In Executor: Remove usage of CarbonMetaDataInstance object and instead pass CarbonTable Object itself

This closes #2852


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/10b39380
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/10b39380
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/10b39380

Branch: refs/heads/master
Commit: 10b393808e91344b017ba3e946b28217c2dd9757
Parents: e2c517e
Author: Indhumathi27 <in...@gmail.com>
Authored: Thu Oct 25 13:37:08 2018 +0530
Committer: manishgupta88 <to...@gmail.com>
Committed: Tue Oct 30 14:19:39 2018 +0530

----------------------------------------------------------------------
 .../core/metadata/CarbonMetadata.java           |  5 +-
 .../statusmanager/SegmentStatusManager.java     |  6 +-
 .../carbondata/core/util/DeleteLoadFolders.java | 31 +++++------
 .../spark/rdd/AlterTableLoadPartitionRDD.scala  |  4 +-
 .../carbondata/spark/rdd/CarbonMergerRDD.scala  |  3 +-
 .../carbondata/spark/rdd/StreamHandoffRDD.scala |  1 -
 .../spark/sql/CarbonDictionaryDecoder.scala     |  5 --
 .../spark/sql/hive/CarbonFileMetastore.scala    |  3 +-
 .../spark/sql/hive/CarbonHiveMetaStore.scala    |  2 +-
 .../loading/DataLoadProcessBuilder.java         |  2 -
 .../sort/impl/ParallelReadMergeSorterImpl.java  |  9 ++-
 ...allelReadMergeSorterWithColumnRangeImpl.java |  8 +--
 .../UnsafeBatchParallelReadMergeSorterImpl.java |  6 +-
 ...allelReadMergeSorterWithColumnRangeImpl.java | 11 ++--
 .../CarbonRowDataWriterProcessorStepImpl.java   | 13 ++---
 .../steps/DataConverterProcessorStepImpl.java   |  6 +-
 .../steps/DataWriterBatchProcessorStepImpl.java | 11 ++--
 .../steps/DataWriterProcessorStepImpl.java      | 18 +++---
 .../merger/CompactionResultSortProcessor.java   |  4 +-
 .../merger/RowResultMergerProcessor.java        |  5 +-
 .../partition/spliter/RowResultProcessor.java   |  5 +-
 .../sort/sortdata/SortParameters.java           | 44 +++++++++------
 .../store/CarbonFactDataHandlerModel.java       |  4 +-
 .../util/CarbonDataProcessorUtil.java           | 58 +++++---------------
 24 files changed, 110 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
index 850f477..e44092e 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/CarbonMetadata.java
@@ -69,9 +69,8 @@ public final class CarbonMetadata {
 
   /**
    * Below method will be used to set the carbon table
-   * This method will be used in executor side as driver will always have
-   * updated table so from driver during query execution and data loading
-   * we just need to add the table
+   * Note: Use this method only in driver as clean up in Executor is not handled
+   *       if this table is added to executor
    *
    * @param carbonTable
    */

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
index 9196367..fbb765b 100755
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
@@ -1001,9 +1001,9 @@ public class SegmentStatusManager {
             CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK);
           }
           if (updationCompletionStatus) {
-            DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion(
-                identifier, carbonTable.getMetadataPath(),
-                newAddedLoadHistoryList, isForceDeletion, partitionSpecs);
+            DeleteLoadFolders
+                .physicalFactAndMeasureMetadataDeletion(carbonTable, newAddedLoadHistoryList,
+                    isForceDeletion, partitionSpecs);
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
index f1cc57f..b614f55 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
@@ -66,22 +66,19 @@ public final class DeleteLoadFolders {
     return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId);
   }
 
-  public static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      String metadataPath,
+  public static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
       LoadMetadataDetails[] newAddedLoadHistoryList,
       boolean isForceDelete,
       List<PartitionSpec> specs) {
-    LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath);
-    physicalFactAndMeasureMetadataDeletion(
-        absoluteTableIdentifier,
+    LoadMetadataDetails[] currentDetails =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath());
+    physicalFactAndMeasureMetadataDeletion(carbonTable,
         currentDetails,
         isForceDelete,
         specs,
         currentDetails);
     if (newAddedLoadHistoryList != null && newAddedLoadHistoryList.length > 0) {
-      physicalFactAndMeasureMetadataDeletion(
-          absoluteTableIdentifier,
+      physicalFactAndMeasureMetadataDeletion(carbonTable,
           newAddedLoadHistoryList,
           isForceDelete,
           specs,
@@ -91,17 +88,15 @@ public final class DeleteLoadFolders {
 
   /**
    * Delete the invalid data physically from table.
-   * @param absoluteTableIdentifier table identifier
+   * @param carbonTable table
    * @param loadDetails Load details which need clean up
    * @param isForceDelete is Force delete requested by user
    * @param specs Partition specs
    * @param currLoadDetails Current table status load details which are required for update manager.
    */
-  private static void physicalFactAndMeasureMetadataDeletion(
-      AbsoluteTableIdentifier absoluteTableIdentifier, LoadMetadataDetails[] loadDetails,
-      boolean isForceDelete, List<PartitionSpec> specs, LoadMetadataDetails[] currLoadDetails) {
-    CarbonTable carbonTable = DataMapStoreManager.getInstance().getCarbonTable(
-        absoluteTableIdentifier);
+  private static void physicalFactAndMeasureMetadataDeletion(CarbonTable carbonTable,
+      LoadMetadataDetails[] loadDetails, boolean isForceDelete, List<PartitionSpec> specs,
+      LoadMetadataDetails[] currLoadDetails) {
     List<TableDataMap> indexDataMaps = new ArrayList<>();
     try {
       for (TableDataMap dataMap : DataMapStoreManager.getInstance().getAllDataMap(carbonTable)) {
@@ -112,7 +107,8 @@ public final class DeleteLoadFolders {
     } catch (IOException e) {
       LOGGER.warn(String.format(
           "Failed to get datamaps for %s.%s, therefore the datamap files could not be cleaned.",
-          absoluteTableIdentifier.getDatabaseName(), absoluteTableIdentifier.getTableName()));
+          carbonTable.getAbsoluteTableIdentifier().getDatabaseName(),
+          carbonTable.getAbsoluteTableIdentifier().getTableName()));
     }
     SegmentUpdateStatusManager updateStatusManager =
         new SegmentUpdateStatusManager(carbonTable, currLoadDetails);
@@ -120,12 +116,11 @@ public final class DeleteLoadFolders {
       if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) {
         try {
           if (oneLoad.getSegmentFile() != null) {
-            SegmentFileStore.deleteSegment(
-                absoluteTableIdentifier.getTablePath(),
+            SegmentFileStore.deleteSegment(carbonTable.getAbsoluteTableIdentifier().getTablePath(),
                 new Segment(oneLoad.getLoadName(), oneLoad.getSegmentFile()),
                 specs, updateStatusManager);
           } else {
-            String path = getSegmentPath(absoluteTableIdentifier, oneLoad);
+            String path = getSegmentPath(carbonTable.getAbsoluteTableIdentifier(), oneLoad);
             boolean status = false;
             if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) {
               CarbonFile file = FileFactory.getCarbonFile(path, FileFactory.getFileType(path));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
index a03447d..4322359 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala
@@ -62,13 +62,11 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel,
       val partitionId: Int = partitionInfo.getPartitionId(split.index)
       carbonLoadModel.setTaskNo(String.valueOf(partitionId))
       carbonLoadModel.setSegmentId(segmentId)
-      CarbonMetadata.getInstance().addCarbonTable(
-        carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable)
 
       CommonUtil.setTempStoreLocation(split.index, carbonLoadModel,
         isCompactionFlow = false, isAltPartitionFlow = true)
       val tempStoreLoc: Array[String] = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true)
+        carbonTable, carbonLoadModel.getTaskNo, segmentId, false, true)
 
       val loadStatus: Boolean = if (rows.isEmpty) {
         LOGGER.info("After repartition this split, NO target rows to write back.")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 07f6964..1fbcc51 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -86,7 +86,6 @@ class CarbonMergerRDD[K, V](
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
     val iter = new Iterator[(K, V)] {
       val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
-      CarbonMetadata.getInstance().addCarbonTable(carbonTable)
       val carbonSparkPartition = theSplit.asInstanceOf[CarbonSparkPartition]
       if (carbonTable.isPartitionTable) {
         carbonLoadModel.setTaskNo(String.valueOf(carbonSparkPartition.partitionId))
@@ -198,7 +197,7 @@ class CarbonMergerRDD[K, V](
         }
 
         val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-          databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false)
+          carbonTable, carbonLoadModel.getTaskNo, mergeNumber, true, false)
 
         if (restructuredBlockExists) {
           LOGGER.info("CompactionResultSortProcessor flow is selected")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
index 3f0eb71..13172c7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/StreamHandoffRDD.scala
@@ -117,7 +117,6 @@ class StreamHandoffRDD[K, V](
     carbonLoadModel.setTaskNo("" + split.index)
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     DataTypeUtil.setDataTypeConverter(new SparkDataTypeConverterImpl)
-    CarbonMetadata.getInstance().addCarbonTable(carbonTable)
     // the input iterator is using raw row
     val iteratorList = prepareInputIterator(split, carbonTable)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
index ff7ac60..f3d5bf0 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDictionaryDecoder.scala
@@ -279,11 +279,6 @@ case class CarbonDictionaryDecoder(
               if (null != carbonDimension.getColumnSchema.getParentColumnTableRelations &&
                   !carbonDimension
                     .getColumnSchema.getParentColumnTableRelations.isEmpty) {
-                val parentRelationIdentifier = carbonDimension.getColumnSchema
-                  .getParentColumnTableRelations.get(0).getRelationIdentifier
-                val parentTablePath = CarbonMetadata.getInstance()
-                  .getCarbonTable(parentRelationIdentifier.getDatabaseName,
-                    parentRelationIdentifier.getTableName).getTablePath
                 (QueryUtil
                   .getTableIdentifierForColumn(carbonDimension),
                   new ColumnIdentifier(carbonDimension.getColumnSchema

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index 982bbee..96b31c2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -491,7 +491,6 @@ class CarbonFileMetastore extends CarbonMetaStore {
       // in the other beeline need to update.
       checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
 
-      removeTableFromMetadata(dbName, tableName)
       CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
       updateSchemasUpdatedTime(touchSchemaFileSystemTime())
       // discard cached table info in cachedDataSourceTables
@@ -499,6 +498,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
       sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
       DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
       SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+      removeTableFromMetadata(dbName, tableName)
     } else {
       if (!isTransactionalCarbonTable(absoluteTableIdentifier)) {
         removeTableFromMetadata(dbName, tableName)
@@ -508,6 +508,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
         sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
         DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
         SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+        removeTableFromMetadata(dbName, tableName)
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
index 5ebe242..c8c7d31 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonHiveMetaStore.scala
@@ -79,13 +79,13 @@ class CarbonHiveMetaStore extends CarbonFileMetastore {
       ManageDictionaryAndBTree.clearBTreeAndDictionaryLRUCache(carbonTable)
     }
     checkSchemasModifiedTimeAndReloadTable(TableIdentifier(tableName, Some(dbName)))
-    removeTableFromMetadata(dbName, tableName)
     CarbonHiveMetadataUtil.invalidateAndDropTable(dbName, tableName, sparkSession)
     // discard cached table info in cachedDataSourceTables
     val tableIdentifier = TableIdentifier(tableName, Option(dbName))
     sparkSession.sessionState.catalog.refreshTable(tableIdentifier)
     DataMapStoreManager.getInstance().clearDataMaps(absoluteTableIdentifier)
     SegmentPropertiesAndSchemaHolder.getInstance().invalidate(absoluteTableIdentifier)
+    removeTableFromMetadata(dbName, tableName)
   }
 
   override def checkSchemasModifiedTimeAndReloadTable(tableIdentifier: TableIdentifier): Boolean = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
index f89a4e7..bcca915 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java
@@ -27,7 +27,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.constants.CarbonLoadOptionConstants;
 import org.apache.carbondata.core.datastore.TableSpec;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.schema.SortColumnRangeInfo;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -249,7 +248,6 @@ public final class DataLoadProcessBuilder {
     configuration.setDataLoadProperty(CarbonLoadOptionConstants.CARBON_OPTIONS_BAD_RECORD_PATH,
         loadModel.getBadRecordsLocation());
 
-    CarbonMetadata.getInstance().addCarbonTable(carbonTable);
     List<CarbonDimension> dimensions =
         carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<CarbonMeasure> measures =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
index f0920ee..55b336e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java
@@ -71,11 +71,10 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter {
   public void initialize(SortParameters sortParameters) {
     this.sortParameters = sortParameters;
     intermediateFileMerger = new SortIntermediateFileMerger(sortParameters);
-    String[] storeLocations =
-        CarbonDataProcessorUtil.getLocalDataFolderLocation(
-            sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(),
-            false, false);
+    String[] storeLocations = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
+            String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), false,
+            false);
     // Set the data file location
     String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
index 3b767aa..8b86c0c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithColumnRangeImpl.java
@@ -148,7 +148,7 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor
 
   private SingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
     String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
+        .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
             String.valueOf(sortParameters.getTaskNo()),
             sortParameters.getSegmentId() + "", false, false);
     // Set the data file location
@@ -188,9 +188,9 @@ public class ParallelReadMergeSorterWithColumnRangeImpl extends AbstractMergeSor
 
   private void setTempLocation(SortParameters parameters) {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(),
-            parameters.getTableName(), parameters.getTaskNo(),
-            parameters.getSegmentId(), false, false);
+        .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+            parameters.getSegmentId(),
+            false, false);
     String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     parameters.setTempFileLocation(tmpLocs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
index 9cb67df..aa960b6 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java
@@ -232,9 +232,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter
     }
 
     private void setTempLocation(SortParameters parameters) {
-      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-          parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(),
-          parameters.getSegmentId(), false, false);
+      String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+          .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+              parameters.getSegmentId(), false, false);
       String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
           File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
       parameters.setTempFileLocation(tempDirs);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
index a8ec05c..f9631a5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithColumnRangeImpl.java
@@ -143,9 +143,9 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
 
   private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(SortParameters sortParameters) {
     String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(),
-            String.valueOf(sortParameters.getTaskNo()),
-            sortParameters.getSegmentId() + "", false, false);
+        .getLocalDataFolderLocation(sortParameters.getCarbonTable(),
+            String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId() + "", false,
+            false);
     // Set the data file location
     String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -182,9 +182,8 @@ public class UnsafeParallelReadMergeSorterWithColumnRangeImpl extends AbstractMe
 
   private void setTempLocation(SortParameters parameters) {
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(),
-            parameters.getTaskNo(), parameters.getSegmentId(),
-            false, false);
+        .getLocalDataFolderLocation(parameters.getCarbonTable(), parameters.getTaskNo(),
+            parameters.getSegmentId(), false, false);
     String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator,
         CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
     LOGGER.warn("set temp location: " + StringUtils.join(tmpLoc, ", "));

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index ae42df7..ce79f24 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -100,13 +100,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
-    String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        tableIdentifier.getDatabaseName(),
-        tableIdentifier.getTableName(),
-        String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(),
-        false,
-        false);
+  private String[] getStoreLocation() {
+    String[] storeLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(this.configuration.getTableSpec().getCarbonTable(),
+            String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }
@@ -161,7 +158,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces
   }
 
   private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) throws IOException {
-    String[] storeLocation = getStoreLocation(tableIdentifier);
+    String[] storeLocation = getStoreLocation();
     DataMapWriterListener listener = getDataMapWriterListener(0);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(
         configuration, storeLocation, 0, iteratorIndex, listener);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
index ae9ec3d..0195877 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataConverterProcessorStepImpl.java
@@ -136,15 +136,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte
     Arrays.sort(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
             sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
-            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
-                configuration.getTableIdentifier().getTableName())));
+            .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
 
     // range partitioner to dispatch rows by sort columns
     this.partitioner = new RangePartitionerImpl(convertedSortColumnRanges,
         new RawRowComparator(sortColumnRangeInfo.getSortColumnIndex(),
             sortColumnRangeInfo.getIsSortColumnNoDict(), CarbonDataProcessorUtil
-            .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
-                configuration.getTableIdentifier().getTableName())));
+            .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable())));
   }
 
   // only convert sort column fields

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 694b345..7cb102b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -67,11 +67,10 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
     child.initialize();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
-    return CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
-        String.valueOf(configuration.getTaskNo()),
-        configuration.getSegmentId(), false, false);
+  private String[] getStoreLocation() {
+    return CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(),
+            String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
   }
 
   @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
@@ -84,7 +83,7 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS
           .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
               System.currentTimeMillis());
       int i = 0;
-      String[] storeLocation = getStoreLocation(tableIdentifier);
+      String[] storeLocation = getStoreLocation();
       CarbonDataProcessorUtil.createLocations(storeLocation);
       for (Iterator<CarbonRowBatch> iterator : iterators) {
         int k = 0;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
index 3d704c9..1595e1b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java
@@ -89,19 +89,16 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) {
+  private String[] getStoreLocation() {
     String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()),
-            configuration.getSegmentId(), false, false);
+        .getLocalDataFolderLocation(configuration.getTableSpec().getCarbonTable(),
+            String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), false, false);
     CarbonDataProcessorUtil.createLocations(storeLocation);
     return storeLocation;
   }
 
   public CarbonFactDataHandlerModel getDataHandlerModel() {
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String[] storeLocation = getStoreLocation(tableIdentifier);
+    String[] storeLocation = getStoreLocation();
     listener = getDataMapWriterListener(0);
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, 0, listener);
@@ -170,14 +167,13 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
     @Override public Void call() throws Exception {
       LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName()
           + ", range: " + rangeId);
-      processRange(insideRangeIterator, tableIdentifier, rangeId);
+      processRange(insideRangeIterator, rangeId);
       return null;
     }
   }
 
-  private void processRange(Iterator<CarbonRowBatch> insideRangeIterator,
-      CarbonTableIdentifier tableIdentifier, int rangeId) {
-    String[] storeLocation = getStoreLocation(tableIdentifier);
+  private void processRange(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) {
+    String[] storeLocation = getStoreLocation();
 
     listener = getDataMapWriterListener(rangeId);
     CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index 8d28d45..7c7b8ee 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -463,7 +463,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
    */
   private void initializeFinalThreadMergerForMergeSort() {
     boolean[] noDictionarySortColumnMapping = CarbonDataProcessorUtil
-        .getNoDictSortColMapping(carbonTable.getDatabaseName(), carbonTable.getTableName());
+        .getNoDictSortColMapping(carbonTable);
     sortParameters.setNoDictionarySortColumn(noDictionarySortColumnMapping);
     String[] sortTempFileLocation = CarbonDataProcessorUtil.arrayAppend(tempStoreLocation,
         CarbonCommonConstants.FILE_SEPARATOR, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -482,7 +482,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
               + carbonLoadModel.getFactTimeStamp() + ".tmp";
     } else {
       carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName,
+          .createCarbonStoreLocation(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(),
               carbonLoadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index dcb7cb4..6475ba8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -81,8 +81,9 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
           partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel
               .getFactTimeStamp() + ".tmp";
     } else {
-      carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
-          loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+      carbonStoreLocation = CarbonDataProcessorUtil
+          .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
+              loadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index ac70f27..977b9d3 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -50,8 +50,9 @@ public class RowResultProcessor {
     CarbonDataProcessorUtil.createLocations(tempStoreLocation);
     this.segmentProperties = segProp;
     String tableName = carbonTable.getTableName();
-    String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation(
-        loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
+    String carbonStoreLocation = CarbonDataProcessorUtil
+        .createCarbonStoreLocation(loadModel.getCarbonDataLoadSchema().getCarbonTable(),
+            loadModel.getSegmentId());
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation, carbonStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
index ecce232..7908f4f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java
@@ -139,6 +139,11 @@ public class SortParameters implements Serializable {
   private int batchSortSizeinMb;
   private int rangeId = 0;
 
+  /**
+   * CarbonTable Info
+   */
+  private CarbonTable carbonTable;
+
   public SortParameters getCopy() {
     SortParameters parameters = new SortParameters();
     parameters.tempFileLocation = tempFileLocation;
@@ -172,6 +177,7 @@ public class SortParameters implements Serializable {
     parameters.numberOfCores = numberOfCores;
     parameters.batchSortSizeinMb = batchSortSizeinMb;
     parameters.rangeId = rangeId;
+    parameters.carbonTable = carbonTable;
     return parameters;
   }
 
@@ -375,11 +381,21 @@ public class SortParameters implements Serializable {
     this.batchSortSizeinMb = batchSortSizeinMb;
   }
 
+  public void setCarbonTable(CarbonTable carbonTable) {
+    this.carbonTable = carbonTable;
+  }
+
+  public CarbonTable getCarbonTable() {
+    return carbonTable;
+  }
+
+
   public static SortParameters createSortParameters(CarbonDataLoadConfiguration configuration) {
     SortParameters parameters = new SortParameters();
     CarbonTableIdentifier tableIdentifier =
         configuration.getTableIdentifier().getCarbonTableIdentifier();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setCarbonTable(configuration.getTableSpec().getCarbonTable());
     parameters.setDatabaseName(tableIdentifier.getDatabaseName());
     parameters.setTableName(tableIdentifier.getTableName());
     parameters.setPartitionID("0");
@@ -401,8 +417,7 @@ public class SortParameters implements Serializable {
     parameters.setNumberOfSortColumns(configuration.getNumberOfSortColumns());
     parameters.setNumberOfNoDictSortColumns(configuration.getNumberOfNoDictSortColumns());
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
-        .getNoDictSortColMapping(configuration.getTableIdentifier().getDatabaseName(),
-            configuration.getTableIdentifier().getTableName()));
+        .getNoDictSortColMapping(parameters.getCarbonTable()));
     parameters.setSortColumn(configuration.getSortColumnMapping());
     parameters.setObserver(new SortObserver());
     // get sort buffer size
@@ -418,10 +433,9 @@ public class SortParameters implements Serializable {
     LOGGER.info("Number of intermediate file to be merged: " + parameters
         .getNumberOfIntermediateFileToBeMerged());
 
-
-    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation(
-        tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(),
-        configuration.getTaskNo(), configuration.getSegmentId(), false, false);
+    String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(parameters.getCarbonTable(),
+            configuration.getTaskNo(), configuration.getSegmentId(), false, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
 
@@ -453,11 +467,9 @@ public class SortParameters implements Serializable {
     DataType[] measureDataType = configuration.getMeasureDataType();
     parameters.setMeasureDataType(measureDataType);
     parameters.setNoDictDataType(CarbonDataProcessorUtil
-        .getNoDictDataTypes(configuration.getTableIdentifier().getDatabaseName(),
-            configuration.getTableIdentifier().getTableName()));
+        .getNoDictDataTypes(configuration.getTableSpec().getCarbonTable()));
     Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
-        .getNoDictSortAndNoSortDataTypes(configuration.getTableIdentifier().getDatabaseName(),
-            configuration.getTableIdentifier().getTableName());
+        .getNoDictSortAndNoSortDataTypes(configuration.getTableSpec().getCarbonTable());
     parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     return parameters;
@@ -477,6 +489,7 @@ public class SortParameters implements Serializable {
       boolean[] sortColumnMapping, boolean[] isVarcharDimensionColumn, boolean isCompactionFlow) {
     SortParameters parameters = new SortParameters();
     CarbonProperties carbonProperties = CarbonProperties.getInstance();
+    parameters.setCarbonTable(carbonTable);
     parameters.setDatabaseName(databaseName);
     parameters.setTableName(tableName);
     parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID);
@@ -506,7 +519,7 @@ public class SortParameters implements Serializable {
         .getNumberOfIntermediateFileToBeMerged());
 
     String[] carbonDataDirectoryPath = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId,
+        .getLocalDataFolderLocation(carbonTable, taskNo, segmentId,
             isCompactionFlow, false);
     String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath,
         File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION);
@@ -532,17 +545,16 @@ public class SortParameters implements Serializable {
         CarbonCommonConstants.CARBON_PREFETCH_BUFFERSIZE_DEFAULT)));
 
     DataType[] type = CarbonDataProcessorUtil
-        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getDatabaseName(),
-            parameters.getTableName());
+        .getMeasureDataType(parameters.getMeasureColCount(), parameters.getCarbonTable());
     parameters.setMeasureDataType(type);
     parameters.setNoDictDataType(CarbonDataProcessorUtil
-        .getNoDictDataTypes(parameters.getDatabaseName(), parameters.getTableName()));
+        .getNoDictDataTypes(carbonTable));
     Map<String, DataType[]> noDictSortAndNoSortDataTypes = CarbonDataProcessorUtil
-        .getNoDictSortAndNoSortDataTypes(parameters.getDatabaseName(), parameters.getTableName());
+        .getNoDictSortAndNoSortDataTypes(parameters.getCarbonTable());
     parameters.setNoDictSortDataType(noDictSortAndNoSortDataTypes.get("noDictSortDataTypes"));
     parameters.setNoDictNoSortDataType(noDictSortAndNoSortDataTypes.get("noDictNoSortDataTypes"));
     parameters.setNoDictionarySortColumn(CarbonDataProcessorUtil
-        .getNoDictSortColMapping(parameters.getDatabaseName(), parameters.getTableName()));
+        .getNoDictSortColMapping(parameters.getCarbonTable()));
     return parameters;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
index d086b6d..878ce6b 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java
@@ -30,7 +30,6 @@ import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
@@ -205,8 +204,7 @@ public class CarbonFactDataHandlerModel {
         }
       }
     }
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(
-        identifier.getDatabaseName(), identifier.getTableName());
+    CarbonTable carbonTable = configuration.getTableSpec().getCarbonTable();
 
     List<ColumnSchema> wrapperColumnSchema = CarbonUtil
         .getColumnSchemaList(carbonTable.getDimensionByTableName(identifier.getTableName()),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/10b39380/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 6f36ef8..044e4e8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -33,7 +33,6 @@ import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.constants.LoggerAction;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -138,25 +137,6 @@ public final class CarbonDataProcessorUtil {
   }
 
   /**
-   * This method will form the local data folder store location
-   *
-   * @param databaseName
-   * @param tableName
-   * @param taskId
-   * @param segmentId
-   * @param isCompactionFlow
-   * @param isAltPartitionFlow
-   * @return
-   */
-  public static String[] getLocalDataFolderLocation(String databaseName, String tableName,
-      String taskId, String segmentId, boolean isCompactionFlow,
-      boolean isAltPartitionFlow) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    return getLocalDataFolderLocation(carbonTable, taskId,
-        segmentId, isCompactionFlow, isAltPartitionFlow);
-  }
-
-  /**
    * This method will form the key for getting the temporary location set in carbon properties
    *
    * @param databaseName
@@ -413,14 +393,12 @@ public final class CarbonDataProcessorUtil {
     return columnNames;
   }
 
-  public static DataType[] getMeasureDataType(int measureCount, String databaseName,
-      String tableName) {
+  public static DataType[] getMeasureDataType(int measureCount, CarbonTable carbonTable) {
     DataType[] type = new DataType[measureCount];
     for (int i = 0; i < type.length; i++) {
       type[i] = DataTypes.DOUBLE;
     }
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(tableName);
+    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(carbonTable.getTableName());
     for (int i = 0; i < type.length; i++) {
       type[i] = measures.get(i).getDataType();
     }
@@ -430,13 +408,12 @@ public final class CarbonDataProcessorUtil {
   /**
    * Get the no dictionary data types on the table
    *
-   * @param databaseName
-   * @param tableName
+   * @param carbonTable
    * @return
    */
-  public static DataType[] getNoDictDataTypes(String databaseName, String tableName) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+  public static DataType[] getNoDictDataTypes(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<DataType> type = new ArrayList<>();
     for (int i = 0; i < dimensions.size(); i++) {
       if (dimensions.get(i).isSortColumn() && !dimensions.get(i).hasEncoding(Encoding.DICTIONARY)) {
@@ -449,13 +426,12 @@ public final class CarbonDataProcessorUtil {
   /**
    * Get the no dictionary sort column mapping of the table
    *
-   * @param databaseName
-   * @param tableName
+   * @param carbonTable
    * @return
    */
-  public static boolean[] getNoDictSortColMapping(String databaseName, String tableName) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+  public static boolean[] getNoDictSortColMapping(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<Boolean> noDicSortColMap = new ArrayList<>();
     for (int i = 0; i < dimensions.size(); i++) {
       if (dimensions.get(i).isSortColumn()) {
@@ -477,14 +453,12 @@ public final class CarbonDataProcessorUtil {
   /**
    * Get the data types of the no dictionary sort columns
    *
-   * @param databaseName
-   * @param tableName
+   * @param carbonTable
    * @return
    */
-  public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(String databaseName,
-      String tableName) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
-    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+  public static Map<String, DataType[]> getNoDictSortAndNoSortDataTypes(CarbonTable carbonTable) {
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getTableName());
     List<DataType> noDictSortType = new ArrayList<>();
     List<DataType> noDictNoSortType = new ArrayList<>();
     for (int i = 0; i < dimensions.size(); i++) {
@@ -509,9 +483,7 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String createCarbonStoreLocation(String databaseName, String tableName,
-      String segmentId) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
+  public static String createCarbonStoreLocation(CarbonTable carbonTable, String segmentId) {
     return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
   }