You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2018/02/27 03:28:52 UTC

[32/49] carbondata git commit: [REBASE] resolve conflict after rebasing to master

[REBASE] resolve conflict after rebasing to master


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a6bf77ff
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a6bf77ff
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a6bf77ff

Branch: refs/heads/carbonstore-rebase4
Commit: a6bf77ff2a2bbf36e139581f075a020425754c5b
Parents: 43e34fc
Author: Jacky Li <ja...@qq.com>
Authored: Tue Feb 27 08:51:25 2018 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Tue Feb 27 08:51:25 2018 +0800

----------------------------------------------------------------------
 .../carbondata/core/datamap/TableDataMap.java   |  2 +-
 .../core/datamap/dev/AbstractDataMapWriter.java |  5 ++--
 .../core/datamap/dev/DataMapFactory.java        |  2 +-
 .../core/indexstore/BlockletDetailsFetcher.java |  2 +-
 .../indexstore/SegmentPropertiesFetcher.java    |  3 +-
 .../blockletindex/BlockletDataMap.java          |  2 +-
 .../blockletindex/BlockletDataMapFactory.java   | 21 ++++++-------
 .../core/metadata/SegmentFileStore.java         |  2 +-
 .../RowLevelRangeGrtThanFiterExecuterImpl.java  |  1 +
 ...elRangeGrtrThanEquaToFilterExecuterImpl.java |  1 +
 ...velRangeLessThanEqualFilterExecuterImpl.java |  1 +
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  1 +
 .../SegmentUpdateStatusManager.java             | 26 ++++------------
 .../apache/carbondata/core/util/CarbonUtil.java | 16 ++++------
 .../testsuite/datamap/CGDataMapTestCase.scala   | 26 ++++++++--------
 .../testsuite/datamap/DataMapWriterSuite.scala  | 19 ++++++------
 .../testsuite/datamap/FGDataMapTestCase.scala   | 31 +++++++++-----------
 .../iud/DeleteCarbonTableTestCase.scala         |  2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   | 14 +++++----
 .../StandardPartitionTableCleanTestCase.scala   | 12 ++++----
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  2 +-
 .../carbondata/spark/util/DataLoadingUtil.scala |  4 +--
 .../CreatePreAggregateTableCommand.scala        |  2 +-
 .../apache/spark/sql/hive/CarbonRelation.scala  |  3 +-
 .../datamap/DataMapWriterListener.java          |  2 +-
 .../loading/model/CarbonLoadModel.java          |  2 +-
 .../processing/merger/CarbonDataMergerUtil.java | 15 +++-------
 .../merger/CompactionResultSortProcessor.java   |  6 ++--
 .../merger/RowResultMergerProcessor.java        |  6 ++--
 .../partition/spliter/RowResultProcessor.java   |  3 +-
 .../util/CarbonDataProcessorUtil.java           |  4 +--
 .../processing/util/CarbonLoaderUtil.java       |  2 +-
 32 files changed, 104 insertions(+), 136 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index eed650e3..2a6ceaa 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -143,7 +143,7 @@ public final class TableDataMap extends OperationEventListener {
       blocklets.addAll(
           dataMap.prune(
               filterExp,
-              segmentPropertiesFetcher.getSegmentProperties(distributable.getSegmentId()),
+              segmentPropertiesFetcher.getSegmentProperties(distributable.getSegment()),
               partitions));
     }
     BlockletSerializer serializer = new BlockletSerializer();

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
index bcc9bad..de6dcb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter {
 
   protected String writeDirectoryPath;
 
-  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
       String writeDirectoryPath) {
     this.identifier = identifier;
-    this.segmentId = segmentId;
+    this.segmentId = segment.getSegmentNo();
     this.writeDirectoryPath = writeDirectoryPath;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index df5670d..50ac279 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -39,7 +39,7 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Return a new write for this datamap
    */
-  AbstractDataMapWriter createWriter(Segment segment);
+  AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
index 5a5fc1e..dd592c0 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java
@@ -53,5 +53,5 @@ public interface BlockletDetailsFetcher {
    * @param segment
    * @return
    */
-  List<Blocklet> getAllBlocklets(Segment segment, List<String> partitions) throws IOException;
+  List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
index ec2ae93..6f94be5 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/SegmentPropertiesFetcher.java
@@ -19,6 +19,7 @@ package org.apache.carbondata.core.indexstore;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 
 /**
@@ -32,5 +33,5 @@ public interface SegmentPropertiesFetcher {
    * @return
    * @throws IOException
    */
-  SegmentProperties getSegmentProperties(String segmentId) throws IOException;
+  SegmentProperties getSegmentProperties(Segment segment) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
index ef1bd33..ce6193b 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java
@@ -660,7 +660,7 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache
   }
 
   @Override
-  public List<Blocklet> prune(FilterResolverIntf filterExp, List<PartitionSpec> partitions) {
+  public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) {
     if (unsafeMemoryDMStore.getRowCount() == 0) {
       return new ArrayList<>();
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index 5ca3ac5..ee849bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -38,13 +38,11 @@ import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.indexstore.Blocklet;
 import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher;
 import org.apache.carbondata.core.indexstore.ExtendedBlocklet;
+import org.apache.carbondata.core.indexstore.PartitionSpec;
 import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher;
 import org.apache.carbondata.core.indexstore.TableBlockIndexUniqueIdentifier;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.SegmentFileStore;
-import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.util.DataFileFooterConverter;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.events.Event;
 
@@ -57,8 +55,7 @@ import org.apache.hadoop.fs.RemoteIterator;
  * Table map for blocklet
  */
 public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
-    implements BlockletDetailsFetcher,
-    SegmentPropertiesFetcher {
+    implements BlockletDetailsFetcher, SegmentPropertiesFetcher {
 
   private AbsoluteTableIdentifier identifier;
 
@@ -75,12 +72,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
   }
 
   @Override
-  public DataMapWriter createWriter(Segment segment) {
+  public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 
   @Override
-  public List<DataMap> getDataMaps(Segment segment) throws IOException {
+  public List<AbstractCoarseGrainDataMap> getDataMaps(Segment segment) throws IOException {
     List<TableBlockIndexUniqueIdentifier> tableBlockIndexUniqueIdentifiers =
         getTableBlockIndexUniqueIdentifiers(segment);
     return cache.getAll(tableBlockIndexUniqueIdentifiers);
@@ -262,8 +259,8 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
     return null;
   }
 
-  @Override public SegmentProperties getSegmentProperties(String segmentId) throws IOException {
-    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+  @Override public SegmentProperties getSegmentProperties(Segment segment) throws IOException {
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segment);
     assert (dataMaps.size() > 0);
     AbstractCoarseGrainDataMap coarseGrainDataMap = dataMaps.get(0);
     assert (coarseGrainDataMap instanceof BlockletDataMap);
@@ -271,12 +268,12 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
     return dataMap.getSegmentProperties();
   }
 
-  @Override public List<Blocklet> getAllBlocklets(String segmentId, List<String> partitions)
+  @Override public List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions)
       throws IOException {
     List<Blocklet> blocklets = new ArrayList<>();
-    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segmentId);
+    List<AbstractCoarseGrainDataMap> dataMaps = getDataMaps(segment);
     for (AbstractCoarseGrainDataMap dataMap : dataMaps) {
-      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segmentId), partitions));
+      blocklets.addAll(dataMap.prune(null, getSegmentProperties(segment), partitions));
     }
     return blocklets;
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index b5f5a25..f48cc6d 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -432,7 +432,7 @@ public class SegmentFileStore {
       boolean forceDelete) throws IOException {
 
     LoadMetadataDetails[] details =
-        SegmentStatusManager.readLoadMetadata(table.getMetaDataFilepath());
+        SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
     // scan through each segment.
     for (LoadMetadataDetails segment : details) {
       // if this segment is valid then only we will go for deletion of related

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
index 1f63a81..e35bb8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtThanFiterExecuterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
index 9140a11..d48abf9 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeGrtrThanEquaToFilterExecuterImpl.java
@@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 120671f..d89a488 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 547ecaa..a00c7db 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -35,6 +35,7 @@ import org.apache.carbondata.core.metadata.encoder.Encoding;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.exception.FilterUnsupportedException;
 import org.apache.carbondata.core.scan.filter.FilterUtil;
 import org.apache.carbondata.core.scan.filter.intf.RowIntf;
 import org.apache.carbondata.core.scan.filter.resolver.resolverinfo.DimColumnResolvedFilterInfo;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 25ce0c8..6ec6fa2 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -233,10 +233,7 @@ public class SegmentUpdateStatusManager {
    * @throws Exception
    */
   public String[] getDeleteDeltaFilePath(String blockFilePath, String segmentId) throws Exception {
-//    int tableFactPathLength = CarbonTablePath.getFactDir(identifier.getTablePath()).length() + 1;
-//    String blockId = blockFilePath.substring(tableFactPathLength);
-
-    String blockId = CarbonUtil.getBlockId(absoluteTableIdentifier, blockFilePath, segmentId);
+    String blockId = CarbonUtil.getBlockId(identifier, blockFilePath, segmentId);
     String tupleId;
     if (isPartitionTable) {
       tupleId = CarbonTablePath.getShortBlockIdForPartitionTable(blockId);
@@ -249,13 +246,8 @@ public class SegmentUpdateStatusManager {
 
   /**
    * Returns all delta file paths of specified block
-   *
-   * @param tupleId
-   * @param extension
-   * @return
-   * @throws Exception
    */
-  public List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+  private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
     try {
       String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
       String completeBlockName = CarbonTablePath.addDataPartPrefix(
@@ -263,11 +255,11 @@ public class SegmentUpdateStatusManager {
               + CarbonCommonConstants.FACT_FILE_EXT);
       String blockPath;
       if (isPartitionTable) {
-        blockPath = absoluteTableIdentifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
+        blockPath = identifier.getTablePath() + CarbonCommonConstants.FILE_SEPARATOR
             + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID)
             .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
       } else {
-        String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment);
+        String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment);
         blockPath =
             carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName;
       }
@@ -391,16 +383,10 @@ public class SegmentUpdateStatusManager {
    * @return
    */
   public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getTablePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId.getSegmentNo());
-
+    String segmentPath =
+        CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId.getSegmentNo());
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
           (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 5ec0158..c9b4337 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2361,21 +2361,19 @@ public final class CarbonUtil {
   }
 
   // Get the total size of carbon data and the total size of carbon index
-  public static HashMap<String, Long> getDataSizeAndIndexSize(CarbonTablePath carbonTablePath,
+  public static HashMap<String, Long> getDataSizeAndIndexSize(String tablePath,
       Segment segment) throws IOException {
     if (segment.getSegmentFileName() != null) {
-      SegmentFileStore fileStore =
-          new SegmentFileStore(carbonTablePath.getPath(), segment.getSegmentFileName());
+      SegmentFileStore fileStore = new SegmentFileStore(tablePath, segment.getSegmentFileName());
       return getDataSizeAndIndexSize(fileStore);
     } else {
-      return getDataSizeAndIndexSize(carbonTablePath, segment.getSegmentNo());
+      return getDataSizeAndIndexSize(tablePath, segment.getSegmentNo());
     }
   }
 
   // Get the total size of segment.
-  public static long getSizeOfSegment(CarbonTablePath carbonTablePath,
-      Segment segment) throws IOException {
-    HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(carbonTablePath, segment);
+  public static long getSizeOfSegment(String tablePath, Segment segment) throws IOException {
+    HashMap<String, Long> dataSizeAndIndexSize = getDataSizeAndIndexSize(tablePath, segment);
     long size = 0;
     for (Long eachSize: dataSizeAndIndexSize.values()) {
       size += eachSize;
@@ -2585,9 +2583,7 @@ public final class CarbonUtil {
     String blockName = filePath.substring(filePath.lastIndexOf("/") + 1, filePath.length());
     String tablePath = identifier.getTablePath();
     if (filePath.startsWith(tablePath)) {
-      String factDir =
-          CarbonStorePath.getCarbonTablePath(tablePath, identifier.getCarbonTableIdentifier())
-              .getFactDir();
+      String factDir = CarbonTablePath.getFactDir(tablePath);
       if (filePath.startsWith(factDir)) {
         blockId = "Part0" + CarbonCommonConstants.FILE_SEPARATOR + "Segment_" + segmentId
             + CarbonCommonConstants.FILE_SEPARATOR + blockName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..1cbbcb4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
     val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
     val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   /**
    * Clears datamap of the segment
    */
-  override def clear(segmentId: String): Unit = {
+  override def clear(segment: Segment): Unit = {
 
   }
 
@@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
   override def prune(
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
@@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
     }
     val meta = findMeta(value(0).getBytes)
     meta.map { f=>
-      new Blocklet(f._1, f._2+"")
+      new Blocklet(f._1, f._2 + "")
     }.asJava
   }
 
@@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
 }
 
 class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String,
+    segment: Segment,
     dataWritePath: String,
     dataMapName: String)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+  extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
 
   var currentBlockId: String = null
   val cgwritepath = dataWritePath + "/" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 2f8a1d1..7e93959 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap
 import java.util
 
 import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 
@@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: Segment): Unit = {}
+  override def clear(segment: Segment): Unit = {}
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
 
   override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
 
@@ -175,9 +174,9 @@ object DataMapWriterSuite {
 
   var callbackSeq: Seq[String] = Seq[String]()
 
-  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
       dataWritePath: String) =
-    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+    new AbstractDataMapWriter(identifier, segment, dataWritePath) {
 
     override def onPageAdded(
         blockletId: Int,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..9c8cc15 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = {
     val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
     }.toList.asJava
   }
 
-
   /**
    *
    * @param event
@@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   /**
    * Clears datamap of the segment
    */
-  override def clear(segmentId: String): Unit = {
+  override def clear(segment: Segment): Unit = {
   }
 
   /**
@@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
   override def prune(
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
@@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
   }
 
   private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
-      value: Array[Byte]): Option[FineGrainBlocklet] = {
+      value: Array[Byte]): Option[Blocklet] = {
     val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
     val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(outputStream)
@@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap {
         pg.setRowId(f._2(p._2).toArray)
         pg
       }
-      pages
       Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
     } else {
       None
     }
-
   }
 
   private def findMeta(value: Array[Byte]) = {
@@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap {
 }
 
 class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String, dataWriterPath: String, dataMapName: String)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+    segment: Segment, dataWriterPath: String, dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
 
   var currentBlockId: String = null
   val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index 22aa385..f92649a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("delete from update_status_files where age=5").show()
     val carbonTable = CarbonEnv
       .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession)
-    val metaPath = carbonTable.getMetaDataFilepath
+    val metaPath = carbonTable.getMetadataPath
     val files = FileFactory.getCarbonFile(metaPath)
     assert(files.listFiles().length == 2)
     sql("drop table update_status_files")

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5550358..b39c44c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -269,7 +269,11 @@ object Global {
 
 class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+  private var identifier: AbsoluteTableIdentifier = _
+
+  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = {
+    this.identifier = identifier
+  }
 
   override def fireEvent(event: Event): Unit = ???
 
@@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: Segment): AbstractDataMapWriter = {
-    new AbstractDataMapWriter {
+  override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = {
+    new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
 
       override def onBlockletEnd(blockletId: Int): Unit = { }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index f238d2b..cfc6983 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
 
   def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val partitions = CarbonFilters
-      .getPartitions(Seq.empty,
-        sqlContext.sparkSession,
-        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+    val partitions = CarbonFilters.getPartitions(
+      Seq.empty,
+      sqlContext.sparkSession,
+      TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
     assert(partitions.get.length == partition)
-    val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath)
+    val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
     val segLoad = details.find(_.getLoadName.equals(segmentId)).get
     val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
     assert(seg.getIndexFiles.size == indexes)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index 8ba2767..97e3061 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -291,7 +291,7 @@ class NewCarbonDataLoadRDD[K, V](
         val fileList: java.util.List[String] = new java.util.ArrayList[String](
             CarbonCommonConstants.CONSTANT_SIZE_TEN)
         CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",")
-        model = carbonLoadModel.getCopyWithPartition(
+        model = carbonLoadModel.getCopyWithPartition("0",
           carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter)
         StandardLogService.setThreadName(StandardLogService
           .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index cf35c12..49e4420 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -441,8 +441,8 @@ object DataLoadingUtil {
 
   private def isUpdationRequired(isForceDeletion: Boolean,
       carbonTable: CarbonTable,
-      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
-    val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
+      absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
+    val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
     // Delete marked loads
     val isUpdationRequired =
       DeleteLoadFolders.deleteLoadFoldersFromFileSystem(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
index 59c43aa..4d0a4c5 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/CreatePreAggregateTableCommand.scala
@@ -184,7 +184,7 @@ case class CreatePreAggregateTableCommand(
       CarbonFilters.getCurrentPartitions(sparkSession,
       TableIdentifier(parentTable.getTableName,
         Some(parentTable.getDatabaseName))).map(_.asJava).orNull)
-    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetaDataFilepath)
+    val loadAvailable = SegmentStatusManager.readLoadMetadata(parentTable.getMetadataPath)
     if (loadAvailable.exists(load => load.getSegmentStatus == SegmentStatus.INSERT_IN_PROGRESS ||
       load.getSegmentStatus == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS)) {
       throw new UnsupportedOperationException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
index c9833d0..5771503 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonRelation.scala
@@ -222,9 +222,8 @@ case class CarbonRelation(
           // for each segment calculate the size
           segments.foreach {validSeg =>
             if (validSeg.getSegmentFileName != null) {
-              val fileStore = new SegmentFileStore(tablePath, validSeg.getSegmentFileName)
               size = size + CarbonUtil.getSizeOfSegment(
-                carbonTablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
+                tablePath, new Segment(validSeg.getSegmentNo, validSeg.getSegmentFileName))
             } else {
               size = size + FileFactory.getDirectorySize(
                 CarbonTablePath.getSegmentPath(tablePath, validSeg.getSegmentNo))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 5083ab5..1104229 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,7 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<AbstractDataMapWriter> writers = registry.get(columns);
-    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
+    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
index a17178a..638ad39 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java
@@ -478,7 +478,7 @@ public class CarbonLoadModel implements Serializable {
    * @param delimiter
    * @return
    */
-  public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition,
+  public CarbonLoadModel getCopyWithPartition(String uniqueId,
       String header, String delimiter) {
     CarbonLoadModel copyObj = new CarbonLoadModel();
     copyObj.tableName = tableName;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 89326a3..d2faef5 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -44,7 +44,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -612,10 +611,10 @@ public final class CarbonDataMergerUtil {
       // variable to store one  segment size across partition.
       long sizeOfOneSegmentAcrossPartition;
       if (segment.getSegmentFile() != null) {
-        sizeOfOneSegmentAcrossPartition = CarbonUtil
-            .getSizeOfSegment(carbonTablePath, new Segment(segId, segment.getSegmentFile()));
+        sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment(
+            tablePath, new Segment(segId, segment.getSegmentFile()));
       } else {
-        sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, tableIdentifier, segId);
+        sizeOfOneSegmentAcrossPartition = getSizeOfSegment(tablePath, segId);
       }
 
       // if size of a segment is greater than the Major compaction size. then ignore it.
@@ -1006,14 +1005,8 @@ public final class CarbonDataMergerUtil {
   /**
    * This method traverses Update Delta Files inside the seg and return true
    * if UpdateDelta Files are more than IUD Compaction threshold.
-   *
-   * @param seg
-   * @param identifier
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
    */
-  public static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
+  private static Boolean checkUpdateDeltaFilesInSeg(Segment seg,
       AbsoluteTableIdentifier absoluteTableIdentifier,
       SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
index b71612a..ea11e22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -406,14 +406,12 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor {
               + carbonLoadModel.getFactTimeStamp() + ".tmp";
     } else {
       carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonTable.getTablePath(), carbonLoadModel.getDatabaseName(),
-              tableName, carbonLoadModel.getPartitionId(), carbonLoadModel.getSegmentId());
+          .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
             tempStoreLocation, carbonStoreLocation);
-    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel);
     dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
         CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
index b41829f..278d5bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -77,14 +77,12 @@ public class RowResultMergerProcessor extends AbstractResultProcessor {
               .getFactTimeStamp() + ".tmp";
     } else {
       carbonStoreLocation = CarbonDataProcessorUtil
-          .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-              tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+          .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     }
     CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
         .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
             tempStoreLocation, carbonStoreLocation);
-    setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
-        carbonFactDataHandlerModel);
+    setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel);
     carbonFactDataHandlerModel.setCompactionFlow(true);
     dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
index ff6ca93..df2e2a2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -48,8 +48,7 @@ public class RowResultProcessor {
     this.segmentProperties = segProp;
     String tableName = carbonTable.getTableName();
     String carbonStoreLocation = CarbonDataProcessorUtil
-        .createCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(),
-            tableName, loadModel.getPartitionId(), loadModel.getSegmentId());
+        .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId());
     CarbonFactDataHandlerModel carbonFactDataHandlerModel =
         CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
             segProp, tableName, tempStoreLocation, carbonStoreLocation);

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
index 2c08c18..dc8ffd7 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java
@@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.ColumnType;
 import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.datatype.DataTypes;
 import org.apache.carbondata.core.metadata.encoder.Encoding;
@@ -371,8 +370,7 @@ public final class CarbonDataProcessorUtil {
    *
    * @return data directory path
    */
-  public static String createCarbonStoreLocation(String factStoreLocation,
-      String databaseName, String tableName, String segmentId) {
+  public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) {
     CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName);
     return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId);
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a6bf77ff/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 1eea61d..a3e889a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -1066,7 +1066,7 @@ public final class CarbonLoaderUtil {
    */
   public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails,
       String segmentId, CarbonTable carbonTable) throws IOException {
-    Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getAbsoluteTableIdentifier(),
+    Map<String, Long> dataIndexSize = CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(),
         new Segment(segmentId, loadMetadataDetails.getSegmentFile()));
     Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE);
     loadMetadataDetails.setDataSize(String.valueOf(dataSize));