You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2018/03/08 16:55:23 UTC

[22/54] [abbrv] carbondata git commit: [CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

[CARBONDATA-2023][DataLoad] Add size base block allocation in data loading

Carbondata assign blocks to nodes at the beginning of data loading.
Previous block allocation strategy is block number based and it will
suffer skewed data problem if the size of input files differs a lot.

We introduced a size based block allocation strategy to optimize data
loading performance in skewed data scenario.

This closes #1808


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8d8b589e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8d8b589e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8d8b589e

Branch: refs/heads/master
Commit: 8d8b589e78a9db1ddc101d20c1e3feb500acce19
Parents: 21704cf
Author: xuchuanyin <xu...@hust.edu.cn>
Authored: Thu Feb 8 14:42:39 2018 +0800
Committer: ravipesala <ra...@gmail.com>
Committed: Thu Mar 8 22:21:10 2018 +0530

----------------------------------------------------------------------
 .../core/datamap/dev/AbstractDataMapWriter.java |   5 +-
 .../core/datamap/dev/DataMapFactory.java        |   2 +-
 .../blockletindex/BlockletDataMapFactory.java   |   2 +-
 .../SegmentUpdateStatusManager.java             |   9 +-
 .../carbondata/core/util/NonDictionaryUtil.java |  67 ++-
 .../datamap/examples/MinMaxDataMapFactory.java  |   5 +-
 .../datamap/examples/MinMaxDataWriter.java      |   7 +-
 .../presto/util/CarbonDataStoreCreator.scala    |   1 +
 .../CarbonIndexFileMergeTestCase.scala          |   4 -
 .../testsuite/datamap/CGDataMapTestCase.scala   |  26 +-
 .../testsuite/datamap/DataMapWriterSuite.scala  |  19 +-
 .../testsuite/datamap/FGDataMapTestCase.scala   |  31 +-
 .../iud/DeleteCarbonTableTestCase.scala         |   2 +-
 .../TestInsertAndOtherCommandConcurrent.scala   |  14 +-
 .../StandardPartitionTableCleanTestCase.scala   |  12 +-
 .../StandardPartitionTableLoadingTestCase.scala |   2 +-
 .../load/DataLoadProcessorStepOnSpark.scala     |   6 +-
 .../carbondata/spark/util/DataLoadingUtil.scala |   2 +-
 .../datamap/DataMapWriterListener.java          |   2 +-
 .../loading/row/IntermediateSortTempRow.java    | 117 -----
 .../loading/sort/SortStepRowHandler.java        | 466 -------------------
 .../loading/sort/SortStepRowUtil.java           | 103 ++++
 .../sort/unsafe/UnsafeCarbonRowPage.java        | 331 +++++++++++--
 .../loading/sort/unsafe/UnsafeSortDataRows.java |  57 ++-
 .../unsafe/comparator/UnsafeRowComparator.java  |  95 ++--
 .../UnsafeRowComparatorForNormalDIms.java       |  59 +++
 .../UnsafeRowComparatorForNormalDims.java       |  59 ---
 .../sort/unsafe/holder/SortTempChunkHolder.java |   3 +-
 .../holder/UnsafeFinalMergePageHolder.java      |  19 +-
 .../unsafe/holder/UnsafeInmemoryHolder.java     |  21 +-
 .../holder/UnsafeSortTempFileChunkHolder.java   | 138 ++++--
 .../merger/UnsafeIntermediateFileMerger.java    | 118 ++++-
 .../UnsafeSingleThreadFinalSortFilesMerger.java |  27 +-
 .../processing/merger/CarbonDataMergerUtil.java |   8 +-
 .../merger/CompactionResultSortProcessor.java   |   5 +-
 .../merger/RowResultMergerProcessor.java        |   5 +-
 .../partition/spliter/RowResultProcessor.java   |   5 +-
 .../sort/sortdata/IntermediateFileMerger.java   |  95 +++-
 .../IntermediateSortTempRowComparator.java      |  73 ---
 .../sort/sortdata/NewRowComparator.java         |   5 +-
 .../sortdata/NewRowComparatorForNormalDims.java |   3 +-
 .../processing/sort/sortdata/RowComparator.java |  94 ++++
 .../sortdata/RowComparatorForNormalDims.java    |  62 +++
 .../SingleThreadFinalSortFilesMerger.java       |  25 +-
 .../processing/sort/sortdata/SortDataRows.java  |  85 +++-
 .../sort/sortdata/SortTempFileChunkHolder.java  | 174 +++++--
 .../sort/sortdata/TableFieldStat.java           | 176 -------
 .../util/CarbonDataProcessorUtil.java           |   4 +-
 .../processing/util/CarbonLoaderUtil.java       |   9 -
 49 files changed, 1368 insertions(+), 1291 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
index bcc9bad..de6dcb1 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/AbstractDataMapWriter.java
@@ -18,6 +18,7 @@ package org.apache.carbondata.core.datamap.dev;
 
 import java.io.IOException;
 
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
@@ -35,10 +36,10 @@ public abstract class AbstractDataMapWriter {
 
   protected String writeDirectoryPath;
 
-  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public AbstractDataMapWriter(AbsoluteTableIdentifier identifier, Segment segment,
       String writeDirectoryPath) {
     this.identifier = identifier;
-    this.segmentId = segmentId;
+    this.segmentId = segment.getSegmentNo();
     this.writeDirectoryPath = writeDirectoryPath;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
index df5670d..50ac279 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java
@@ -39,7 +39,7 @@ public interface DataMapFactory<T extends DataMap> {
   /**
    * Return a new write for this datamap
    */
-  AbstractDataMapWriter createWriter(Segment segment);
+  AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath);
 
   /**
    * Get the datamap for segmentid

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
index efe2b71..ee849bd 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMapFactory.java
@@ -72,7 +72,7 @@ public class BlockletDataMapFactory extends AbstractCoarseGrainDataMapFactory
   }
 
   @Override
-  public AbstractDataMapWriter createWriter(Segment segment) {
+  public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
     throw new UnsupportedOperationException("not implemented");
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
index 94a4243..39eb262 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java
@@ -260,13 +260,8 @@ public class SegmentUpdateStatusManager {
 
   /**
    * Returns all delta file paths of specified block
-   *
-   * @param tupleId
-   * @param extension
-   * @return
-   * @throws Exception
    */
-  public List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
+  private List<String> getDeltaFiles(String tupleId, String extension) throws Exception {
     try {
       String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID);
       String completeBlockName = CarbonTablePath.addDataPartPrefix(
@@ -405,10 +400,8 @@ public class SegmentUpdateStatusManager {
   public CarbonFile[] getDeleteDeltaFilesList(final Segment segmentId, final String blockName) {
     String segmentPath = CarbonTablePath.getSegmentPath(
         identifier.getTablePath(), segmentId.getSegmentNo());
-
     CarbonFile segDir =
         FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-
     for (SegmentUpdateDetails block : updateDetails) {
       if ((block.getBlockName().equalsIgnoreCase(blockName)) &&
           (block.getSegmentName().equalsIgnoreCase(segmentId.getSegmentNo()))

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
index fca1244..d6ecfbc 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/NonDictionaryUtil.java
@@ -82,26 +82,18 @@ public class NonDictionaryUtil {
   }
 
   /**
-   * Method to get the required dictionary Dimension from obj []
+   * Method to get the required Dimension from obj []
    *
    * @param index
    * @param row
    * @return
    */
-  public static int getDictDimension(int index, Object[] row) {
-    int[] dimensions = (int[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+  public static Integer getDimension(int index, Object[] row) {
+
+    Integer[] dimensions = (Integer[]) row[WriteStepRowUtil.DICTIONARY_DIMENSION];
+
     return dimensions[index];
-  }
 
-  /**
-   * Method to get the required non-dictionary & complex from 3-parted row
-   * @param index
-   * @param row
-   * @return
-   */
-  public static byte[] getNoDictOrComplex(int index, Object[] row) {
-    byte[][] nonDictArray = (byte[][]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
-    return nonDictArray[index];
   }
 
   /**
@@ -116,11 +108,60 @@ public class NonDictionaryUtil {
     return measures[index];
   }
 
+  public static byte[] getByteArrayForNoDictionaryCols(Object[] row) {
+
+    return (byte[]) row[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+  }
+
   public static void prepareOutObj(Object[] out, int[] dimArray, byte[][] byteBufferArr,
       Object[] measureArray) {
+
     out[WriteStepRowUtil.DICTIONARY_DIMENSION] = dimArray;
     out[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX] = byteBufferArr;
     out[WriteStepRowUtil.MEASURE] = measureArray;
 
   }
+
+  /**
+   * This method will extract the single dimension from the complete high card dims byte[].+     *
+   * The format of the byte [] will be,  Totallength,CompleteStartOffsets,Dat
+   *
+   * @param highCardArr
+   * @param index
+   * @param highCardinalityCount
+   * @param outBuffer
+   */
+  public static void extractSingleHighCardDims(byte[] highCardArr, int index,
+      int highCardinalityCount, ByteBuffer outBuffer) {
+    ByteBuffer buff = null;
+    short secIndex = 0;
+    short firstIndex = 0;
+    int length;
+    // if the requested index is a last one then we need to calculate length
+    // based on byte[] length.
+    if (index == highCardinalityCount - 1) {
+      // need to read 2 bytes(1 short) to determine starting offset and
+      // length can be calculated by array length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 2);
+    } else {
+      // need to read 4 bytes(2 short) to determine starting offset and
+      // length.
+      buff = ByteBuffer.wrap(highCardArr, (index * 2) + 2, 4);
+    }
+
+    firstIndex = buff.getShort();
+    // if it is a last dimension in high card then this will be last
+    // offset.so calculate length from total length
+    if (index == highCardinalityCount - 1) {
+      secIndex = (short) highCardArr.length;
+    } else {
+      secIndex = buff.getShort();
+    }
+
+    length = secIndex - firstIndex;
+
+    outBuffer.position(firstIndex);
+    outBuffer.limit(outBuffer.position() + length);
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
index 266c107..4ef74a7 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataMapFactory.java
@@ -52,9 +52,8 @@ public class MinMaxDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    * @param segment
    * @return
    */
-  @Override public AbstractDataMapWriter createWriter(Segment segment) {
-    return new MinMaxDataWriter(identifier, segment.getSegmentNo(),
-        CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()));
+  @Override public AbstractDataMapWriter createWriter(Segment segment, String writeDirectoryPath) {
+    return new MinMaxDataWriter(identifier, segment, writeDirectoryPath);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
----------------------------------------------------------------------
diff --git a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
index fe0bbcf..5046182 100644
--- a/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
+++ b/datamap/examples/src/minmaxdatamap/main/java/org/apache/carbondata/datamap/examples/MinMaxDataWriter.java
@@ -28,6 +28,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datamap.Segment;
 import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
 import org.apache.carbondata.core.datastore.page.ColumnPage;
@@ -52,11 +53,11 @@ public class MinMaxDataWriter extends AbstractDataMapWriter {
 
   private String dataWritePath;
 
-  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, String segmentId,
+  public MinMaxDataWriter(AbsoluteTableIdentifier identifier, Segment segment,
       String dataWritePath) {
-    super(identifier, segmentId, dataWritePath);
+    super(identifier, segment, dataWritePath);
     this.identifier = identifier;
-    this.segmentId = segmentId;
+    this.segmentId = segment.getSegmentNo();
     this.dataWritePath = dataWritePath;
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
----------------------------------------------------------------------
diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
index 1bc9812..e768660 100644
--- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
+++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala
@@ -383,6 +383,7 @@ object CarbonDataStoreCreator {
       .getInstance.createCache(CacheType.REVERSE_DICTIONARY)
 
     for (i <- set.indices) {
+      //      val dim = getDimension(dims, i).get
       val columnIdentifier: ColumnIdentifier =
         new ColumnIdentifier(dims.get(i).getColumnId, null, null)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 7608318..aace3ea 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -85,7 +85,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
@@ -111,7 +110,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0", table.getTablePath, false)
     new CarbonIndexFileMergeWriter()
@@ -141,7 +139,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
@@ -171,7 +168,6 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
     sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
     val table = CarbonMetadata.getInstance().getCarbonTable("default","nonindexmerge")
-    val carbonTablePath = new CarbonTablePath(table.getCarbonTableIdentifier, table.getTablePath)
     new CarbonIndexFileMergeWriter()
       .mergeCarbonIndexFilesOfSegment("0.1", table.getTablePath, false)
     assert(getIndexFileCount("default_nonindexmerge", "0") == 100)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
index 4b6f231..1cbbcb4 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/CGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.Blocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new CGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+    new CGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractCoarseGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[AbstractCoarseGrainDataMap] = {
     val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -108,9 +108,9 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
+  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
     val file = FileFactory.getCarbonFile(
-      CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -125,7 +125,7 @@ class CGDataMapFactory extends AbstractCoarseGrainDataMapFactory {
   /**
    * Clears datamap of the segment
    */
-  override def clear(segmentId: String): Unit = {
+  override def clear(segment: Segment): Unit = {
 
   }
 
@@ -175,7 +175,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
   override def prune(
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[Blocklet] = {
+      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
@@ -184,7 +184,7 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
     }
     val meta = findMeta(value(0).getBytes)
     meta.map { f=>
-      new Blocklet(f._1, f._2+"")
+      new Blocklet(f._1, f._2 + "")
     }.asJava
   }
 
@@ -219,10 +219,10 @@ class CGDataMap extends AbstractCoarseGrainDataMap {
 }
 
 class CGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String,
+    segment: Segment,
     dataWritePath: String,
     dataMapName: String)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+  extends AbstractDataMapWriter(identifier, segment, dataWritePath) {
 
   var currentBlockId: String = null
   val cgwritepath = dataWritePath + "/" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
index 2f8a1d1..7e93959 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/DataMapWriterSuite.scala
@@ -20,21 +20,19 @@ package org.apache.carbondata.spark.testsuite.datamap
 import java.util
 
 import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.test.util.QueryTest
 import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datamap.dev.{DataMap, DataMapFactory, DataMapWriter}
 import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
-import org.apache.carbondata.core.datamap.dev.AbstractDataMapWriter
+import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap}
 import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainDataMap, AbstractCoarseGrainDataMapFactory}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
 import org.apache.carbondata.core.datastore.page.ColumnPage
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.scan.filter.intf.ExpressionType
-import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.Event
 
@@ -49,15 +47,16 @@ class C2DataMapFactory() extends AbstractCoarseGrainDataMapFactory {
 
   override def fireEvent(event: Event): Unit = ???
 
-  override def clear(segmentId: Segment): Unit = {}
+  override def clear(segment: Segment): Unit = {}
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: Segment): AbstractDataMapWriter = DataMapWriterSuite.dataMapWriterC2Mock
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter =
+    DataMapWriterSuite.dataMapWriterC2Mock(identifier, segment, dataWritePath)
 
   override def getMeta: DataMapMeta = new DataMapMeta(List("c2").asJava, List(ExpressionType.EQUALS).asJava)
 
@@ -175,9 +174,9 @@ object DataMapWriterSuite {
 
   var callbackSeq: Seq[String] = Seq[String]()
 
-  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segmentId: String,
+  def dataMapWriterC2Mock(identifier: AbsoluteTableIdentifier, segment: Segment,
       dataWritePath: String) =
-    new AbstractDataMapWriter(identifier, segmentId, dataWritePath) {
+    new AbstractDataMapWriter(identifier, segment, dataWritePath) {
 
     override def onPageAdded(
         blockletId: Int,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
index d1bb65f..9c8cc15 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/FGDataMapTestCase.scala
@@ -27,14 +27,14 @@ import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.core.datamap.dev.fgdatamap.{AbstractFineGrainDataMap, AbstractFineGrainDataMapFactory}
 import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMapModel}
-import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager}
+import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager, Segment}
 import org.apache.carbondata.core.datastore.FileReader
 import org.apache.carbondata.core.datastore.block.SegmentProperties
 import org.apache.carbondata.core.datastore.compression.SnappyCompressor
 import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.datastore.page.ColumnPage
-import org.apache.carbondata.core.indexstore.FineGrainBlocklet
+import org.apache.carbondata.core.indexstore.{Blocklet, FineGrainBlocklet, PartitionSpec}
 import org.apache.carbondata.core.indexstore.blockletindex.BlockletDataMapDistributable
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
 import org.apache.carbondata.core.scan.expression.Expression
@@ -62,16 +62,16 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   /**
    * Return a new write for this datamap
    */
-  override def createWriter(segmentId: String, dataWritePath: String): AbstractDataMapWriter = {
-    new FGDataMapWriter(identifier, segmentId, dataWritePath, dataMapName)
+  override def createWriter(segment: Segment, dataWritePath: String): AbstractDataMapWriter = {
+    new FGDataMapWriter(identifier, segment, dataWritePath, dataMapName)
   }
 
   /**
    * Get the datamap for segmentid
    */
-  override def getDataMaps(segmentId: String): java.util.List[AbstractFineGrainDataMap] = {
+  override def getDataMaps(segment: Segment): java.util.List[AbstractFineGrainDataMap] = {
     val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -99,9 +99,9 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
    *
    * @return
    */
-  override def toDistributable(segmentId: String): java.util.List[DataMapDistributable] = {
-    val file = FileFactory
-      .getCarbonFile(CarbonTablePath.getSegmentPath(identifier.getTablePath, segmentId))
+  override def toDistributable(segment: Segment): java.util.List[DataMapDistributable] = {
+    val file = FileFactory.getCarbonFile(
+      CarbonTablePath.getSegmentPath(identifier.getTablePath, segment.getSegmentNo))
 
     val files = file.listFiles(new CarbonFileFilter {
       override def accept(file: CarbonFile): Boolean = file.getName.endsWith(".datamap")
@@ -112,7 +112,6 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
     }.toList.asJava
   }
 
-
   /**
    *
    * @param event
@@ -124,7 +123,7 @@ class FGDataMapFactory extends AbstractFineGrainDataMapFactory {
   /**
    * Clears datamap of the segment
    */
-  override def clear(segmentId: String): Unit = {
+  override def clear(segment: Segment): Unit = {
   }
 
   /**
@@ -173,7 +172,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
   override def prune(
       filterExp: FilterResolverIntf,
       segmentProperties: SegmentProperties,
-      partitions: java.util.List[String]): java.util.List[FineGrainBlocklet] = {
+      partitions: java.util.List[PartitionSpec]): java.util.List[Blocklet] = {
     val buffer: ArrayBuffer[Expression] = new ArrayBuffer[Expression]()
     val expression = filterExp.getFilterExpression
     getEqualToExpression(expression, buffer)
@@ -187,7 +186,7 @@ class FGDataMap extends AbstractFineGrainDataMap {
   }
 
   private def readAndFindData(meta: (String, Int, (Array[Byte], Array[Byte]), Long, Int),
-      value: Array[Byte]): Option[FineGrainBlocklet] = {
+      value: Array[Byte]): Option[Blocklet] = {
     val bytes = FileReader.readByteArray(filePath, meta._4, meta._5)
     val outputStream = new ByteArrayInputStream(compressor.unCompressByte(bytes))
     val obj = new ObjectInputStream(outputStream)
@@ -211,12 +210,10 @@ class FGDataMap extends AbstractFineGrainDataMap {
         pg.setRowId(f._2(p._2).toArray)
         pg
       }
-      pages
       Some(new FineGrainBlocklet(meta._1, meta._2.toString, pages.toList.asJava))
     } else {
       None
     }
-
   }
 
   private def findMeta(value: Array[Byte]) = {
@@ -249,8 +246,8 @@ class FGDataMap extends AbstractFineGrainDataMap {
 }
 
 class FGDataMapWriter(identifier: AbsoluteTableIdentifier,
-    segmentId: String, dataWriterPath: String, dataMapName: String)
-  extends AbstractDataMapWriter(identifier, segmentId, dataWriterPath) {
+    segment: Segment, dataWriterPath: String, dataMapName: String)
+  extends AbstractDataMapWriter(identifier, segment, dataWriterPath) {
 
   var currentBlockId: String = null
   val fgwritepath = dataWriterPath + "/" + System.nanoTime() + ".datamap"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
index d05f022..510903a 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/DeleteCarbonTableTestCase.scala
@@ -194,7 +194,7 @@ class DeleteCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("delete from update_status_files where age=5").show()
     val carbonTable = CarbonEnv
       .getCarbonTable(Some("iud_db"), "update_status_files")(sqlContext.sparkSession)
-    val metaPath = carbonTable.getMetaDataFilepath
+    val metaPath = carbonTable.getMetadataPath
     val files = FileFactory.getCarbonFile(metaPath)
     val result = CarbonEnv.getInstance(sqlContext.sparkSession).carbonMetastore.getClass
     if(result.getCanonicalName.contains("CarbonFileMetastore")) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
index 5550358..b39c44c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala
@@ -269,7 +269,11 @@ object Global {
 
 class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
-  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = { }
+  private var identifier: AbsoluteTableIdentifier = _
+
+  override def init(identifier: AbsoluteTableIdentifier, dataMapName: String): Unit = {
+    this.identifier = identifier
+  }
 
   override def fireEvent(event: Event): Unit = ???
 
@@ -277,12 +281,12 @@ class WaitingDataMap() extends AbstractCoarseGrainDataMapFactory {
 
   override def clear(): Unit = {}
 
-  override def getDataMaps(distributable: DataMapDistributable): java.util.List[AbstractCoarseGrainDataMap] = ???
+  override def getDataMaps(distributable: DataMapDistributable): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def getDataMaps(segmentId: Segment): util.List[DataMap] = ???
+  override def getDataMaps(segment: Segment): util.List[AbstractCoarseGrainDataMap] = ???
 
-  override def createWriter(segmentId: Segment): AbstractDataMapWriter = {
-    new AbstractDataMapWriter {
+  override def createWriter(segment: Segment, writeDirectoryPath: String): AbstractDataMapWriter = {
+    new AbstractDataMapWriter(identifier, segment, writeDirectoryPath) {
       override def onPageAdded(blockletId: Int, pageId: Int, pages: Array[ColumnPage]): Unit = { }
 
       override def onBlockletEnd(blockletId: Int): Unit = { }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
index f238d2b..cfc6983 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala
@@ -52,14 +52,12 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA
 
   def validateDataFiles(tableUniqueName: String, segmentId: String, partition: Int, indexes: Int): Unit = {
     val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
-      carbonTable.getTablePath)
-    val partitions = CarbonFilters
-      .getPartitions(Seq.empty,
-        sqlContext.sparkSession,
-        TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
+    val partitions = CarbonFilters.getPartitions(
+      Seq.empty,
+      sqlContext.sparkSession,
+      TableIdentifier(carbonTable.getTableName, Some(carbonTable.getDatabaseName)))
     assert(partitions.get.length == partition)
-    val details = SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath)
+    val details = SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
     val segLoad = details.find(_.getLoadName.equals(segmentId)).get
     val seg = new SegmentFileStore(carbonTable.getTablePath, segLoad.getSegmentFile)
     assert(seg.getIndexFiles.size == indexes)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index 2ce46ef..baf1627 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.{CarbonMetadata, SegmentFileStore}
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 
 class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfterAll {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 0422239..5124247 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -35,7 +35,7 @@ import org.apache.carbondata.processing.loading.converter.impl.RowConverterImpl
 import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.loading.parser.impl.RowParserImpl
-import org.apache.carbondata.processing.loading.sort.SortStepRowHandler
+import org.apache.carbondata.processing.loading.sort.SortStepRowUtil
 import org.apache.carbondata.processing.loading.steps.DataWriterProcessorStepImpl
 import org.apache.carbondata.processing.sort.sortdata.SortParameters
 import org.apache.carbondata.processing.store.{CarbonFactHandler, CarbonFactHandlerFactory}
@@ -206,7 +206,7 @@ object DataLoadProcessorStepOnSpark {
     val model: CarbonLoadModel = modelBroadcast.value.getCopyWithTaskNo(index.toString)
     val conf = DataLoadProcessBuilder.createConfiguration(model)
     val sortParameters = SortParameters.createSortParameters(conf)
-    val sortStepRowHandler = new SortStepRowHandler(sortParameters)
+    val sortStepRowUtil = new SortStepRowUtil(sortParameters)
     TaskContext.get().addTaskFailureListener { (t: TaskContext, e: Throwable) =>
       wrapException(e, model)
     }
@@ -216,7 +216,7 @@ object DataLoadProcessorStepOnSpark {
 
       override def next(): CarbonRow = {
         val row =
-          new CarbonRow(sortStepRowHandler.convertRawRowTo3Parts(rows.next().getData))
+          new CarbonRow(sortStepRowUtil.convertRow(rows.next().getData))
         rowCounter.add(1)
         row
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
index cee40c8..49e4420 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/DataLoadingUtil.scala
@@ -441,7 +441,7 @@ object DataLoadingUtil {
 
   private def isUpdationRequired(isForceDeletion: Boolean,
       carbonTable: CarbonTable,
-      absoluteTableIdentifier: AbsoluteTableIdentifier) = {
+      absoluteTableIdentifier: AbsoluteTableIdentifier): (Array[LoadMetadataDetails], Boolean) = {
     val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
     // Delete marked loads
     val isUpdationRequired =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 5083ab5..1104229 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -74,7 +74,7 @@ public class DataMapWriterListener {
     }
     List<String> columns = factory.getMeta().getIndexedColumns();
     List<AbstractDataMapWriter> writers = registry.get(columns);
-    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null));
+    AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath);
     if (writers != null) {
       writers.add(writer);
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java b/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
deleted file mode 100644
index 8d351cf..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/row/IntermediateSortTempRow.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.loading.row;
-
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-
-/**
- * During sort procedure, each row will be written to sort temp file in this logic format.
- * an intermediate sort temp row consists 3 parts:
- * dictSort, noDictSort, noSortDimsAndMeasures(dictNoSort, noDictNoSort, measure)
- */
-public class IntermediateSortTempRow {
-  private int[] dictSortDims;
-  private byte[][] noDictSortDims;
-  private byte[] noSortDimsAndMeasures;
-
-  public IntermediateSortTempRow(int[] dictSortDims, byte[][] noDictSortDims,
-      byte[] noSortDimsAndMeasures) {
-    this.dictSortDims = dictSortDims;
-    this.noDictSortDims = noDictSortDims;
-    this.noSortDimsAndMeasures = noSortDimsAndMeasures;
-  }
-
-  public int[] getDictSortDims() {
-    return dictSortDims;
-  }
-
-  public byte[][] getNoDictSortDims() {
-    return noDictSortDims;
-  }
-
-  public byte[] getNoSortDimsAndMeasures() {
-    return noSortDimsAndMeasures;
-  }
-
-  /**
-   * deserialize from bytes array to get the no sort fields
-   * @param outDictNoSort stores the dict & no-sort fields
-   * @param outNoDictNoSort stores the no-dict & no-sort fields, including complex
-   * @param outMeasures stores the measure fields
-   * @param dataTypes data type for the measure
-   */
-  public void unpackNoSortFromBytes(int[] outDictNoSort, byte[][] outNoDictNoSort,
-      Object[] outMeasures, DataType[] dataTypes) {
-    ByteBuffer rowBuffer = ByteBuffer.wrap(noSortDimsAndMeasures);
-    // read dict_no_sort
-    int dictNoSortCnt = outDictNoSort.length;
-    for (int i = 0; i < dictNoSortCnt; i++) {
-      outDictNoSort[i] = rowBuffer.getInt();
-    }
-
-    // read no_dict_no_sort (including complex)
-    int noDictNoSortCnt = outNoDictNoSort.length;
-    for (int i = 0; i < noDictNoSortCnt; i++) {
-      short len = rowBuffer.getShort();
-      byte[] bytes = new byte[len];
-      rowBuffer.get(bytes);
-      outNoDictNoSort[i] = bytes;
-    }
-
-    // read measure
-    int measureCnt = outMeasures.length;
-    DataType tmpDataType;
-    Object tmpContent;
-    for (short idx = 0 ; idx < measureCnt; idx++) {
-      if ((byte) 0 == rowBuffer.get()) {
-        outMeasures[idx] = null;
-        continue;
-      }
-
-      tmpDataType = dataTypes[idx];
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((byte) 1 == rowBuffer.get()) {
-          tmpContent = true;
-        } else {
-          tmpContent = false;
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        tmpContent = rowBuffer.getShort();
-      } else if (DataTypes.INT == tmpDataType) {
-        tmpContent = rowBuffer.getInt();
-      } else if (DataTypes.LONG == tmpDataType) {
-        tmpContent = rowBuffer.getLong();
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        tmpContent = rowBuffer.getDouble();
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        short len = rowBuffer.getShort();
-        byte[] decimalBytes = new byte[len];
-        rowBuffer.get(decimalBytes);
-        tmpContent = DataTypeUtil.byteToBigDecimal(decimalBytes);
-      } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
-      }
-      outMeasures[idx] = tmpContent;
-    }
-  }
-
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
deleted file mode 100644
index f31a2b9..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowHandler.java
+++ /dev/null
@@ -1,466 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.loading.sort;
-
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-
-import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.metadata.datatype.DataType;
-import org.apache.carbondata.core.metadata.datatype.DataTypes;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.NonDictionaryUtil;
-import org.apache.carbondata.processing.loading.row.IntermediateSortTempRow;
-import org.apache.carbondata.processing.sort.sortdata.SortParameters;
-import org.apache.carbondata.processing.sort.sortdata.TableFieldStat;
-
-/**
- * This class is used to convert/write/read row in sort step in carbondata.
- * It consists the following function:
- * 1. convert raw row & intermediate sort temp row to 3-parted row
- * 2. read/write intermediate sort temp row to sort temp file & unsafe memory
- * 3. write raw row directly to sort temp file & unsafe memory as intermediate sort temp row
- */
-public class SortStepRowHandler implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private int dictSortDimCnt = 0;
-  private int dictNoSortDimCnt = 0;
-  private int noDictSortDimCnt = 0;
-  private int noDictNoSortDimCnt = 0;
-  private int measureCnt;
-
-  // indices for dict & sort dimension columns
-  private int[] dictSortDimIdx;
-  // indices for dict & no-sort dimension columns
-  private int[] dictNoSortDimIdx;
-  // indices for no-dict & sort dimension columns
-  private int[] noDictSortDimIdx;
-  // indices for no-dict & no-sort dimension columns, including complex columns
-  private int[] noDictNoSortDimIdx;
-  // indices for measure columns
-  private int[] measureIdx;
-
-  private DataType[] dataTypes;
-
-  /**
-   * constructor
-   * @param tableFieldStat table field stat
-   */
-  public SortStepRowHandler(TableFieldStat tableFieldStat) {
-    this.dictSortDimCnt = tableFieldStat.getDictSortDimCnt();
-    this.dictNoSortDimCnt = tableFieldStat.getDictNoSortDimCnt();
-    this.noDictSortDimCnt = tableFieldStat.getNoDictSortDimCnt();
-    this.noDictNoSortDimCnt = tableFieldStat.getNoDictNoSortDimCnt();
-    this.measureCnt = tableFieldStat.getMeasureCnt();
-    this.dictSortDimIdx = tableFieldStat.getDictSortDimIdx();
-    this.dictNoSortDimIdx = tableFieldStat.getDictNoSortDimIdx();
-    this.noDictSortDimIdx = tableFieldStat.getNoDictSortDimIdx();
-    this.noDictNoSortDimIdx = tableFieldStat.getNoDictNoSortDimIdx();
-    this.measureIdx = tableFieldStat.getMeasureIdx();
-    this.dataTypes = tableFieldStat.getMeasureDataType();
-  }
-
-  /**
-   * constructor
-   * @param sortParameters sort parameters
-   */
-  public SortStepRowHandler(SortParameters sortParameters) {
-    this(new TableFieldStat(sortParameters));
-  }
-
-  /**
-   * Convert carbon row from raw format to 3-parted format.
-   * This method is used in global-sort.
-   *
-   * @param row raw row whose length is the same as field number
-   * @return 3-parted row whose length is 3. (1 for dict dims ,1 for non-dict and complex,
-   * 1 for measures)
-   */
-  public Object[] convertRawRowTo3Parts(Object[] row) {
-    Object[] holder = new Object[3];
-    try {
-      int[] dictDims
-          = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-      byte[][] nonDictArray = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
-      Object[] measures = new Object[this.measureCnt];
-
-      // convert dict & data
-      int idxAcc = 0;
-      for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-        dictDims[idxAcc++] = (int) row[this.dictSortDimIdx[idx]];
-      }
-
-      // convert dict & no-sort
-      for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
-        dictDims[idxAcc++] = (int) row[this.dictNoSortDimIdx[idx]];
-      }
-      // convert no-dict & sort
-      idxAcc = 0;
-      for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictSortDimIdx[idx]];
-      }
-      // convert no-dict & no-sort
-      for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-        nonDictArray[idxAcc++] = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      }
-
-      // convert measure data
-      for (int idx = 0; idx < this.measureCnt; idx++) {
-        measures[idx] = row[this.measureIdx[idx]];
-      }
-
-      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
-    } catch (Exception e) {
-      throw new RuntimeException("Problem while converting row to 3 parts", e);
-    }
-    return holder;
-  }
-
-  /**
-   * Convert intermediate sort temp row to 3-parted row.
-   * This method is used in the final merge sort to feed rows to the next write step.
-   *
-   * @param sortTempRow intermediate sort temp row
-   * @return 3-parted row
-   */
-  public Object[] convertIntermediateSortTempRowTo3Parted(IntermediateSortTempRow sortTempRow) {
-    int[] dictDims
-        = new int[this.dictSortDimCnt + this.dictNoSortDimCnt];
-    byte[][] noDictArray
-        = new byte[this.noDictSortDimCnt + this.noDictNoSortDimCnt][];
-
-    int[] dictNoSortDims = new int[this.dictNoSortDimCnt];
-    byte[][] noDictNoSortDims = new byte[this.noDictNoSortDimCnt][];
-    Object[] measures = new Object[this.measureCnt];
-
-    sortTempRow.unpackNoSortFromBytes(dictNoSortDims, noDictNoSortDims, measures, this.dataTypes);
-
-    // dict dims
-    System.arraycopy(sortTempRow.getDictSortDims(), 0 , dictDims,
-        0, this.dictSortDimCnt);
-    System.arraycopy(dictNoSortDims, 0, dictDims,
-        this.dictSortDimCnt, this.dictNoSortDimCnt);;
-
-    // no dict dims, including complex
-    System.arraycopy(sortTempRow.getNoDictSortDims(), 0,
-        noDictArray, 0, this.noDictSortDimCnt);
-    System.arraycopy(noDictNoSortDims, 0, noDictArray,
-        this.noDictSortDimCnt, this.noDictNoSortDimCnt);
-
-    // measures are already here
-
-    Object[] holder = new Object[3];
-    NonDictionaryUtil.prepareOutObj(holder, dictDims, noDictArray, measures);
-    return holder;
-  }
-
-  /**
-   * Read intermediate sort temp row from InputStream.
-   * This method is used during the merge sort phase to read row from sort temp file.
-   *
-   * @param inputStream input stream
-   * @return a row that contains three parts
-   * @throws IOException if error occrus while reading from stream
-   */
-  public IntermediateSortTempRow readIntermediateSortTempRowFromInputStream(
-      DataInputStream inputStream) throws IOException {
-    int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
-    // read dict & sort dim data
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      dictSortDims[idx] = inputStream.readInt();
-    }
-
-    // read no-dict & sort data
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      short len = inputStream.readShort();
-      byte[] bytes = new byte[len];
-      inputStream.readFully(bytes);
-      noDictSortDims[idx] = bytes;
-    }
-
-    // read no-dict dims & measures
-    int len = inputStream.readInt();
-    byte[] noSortDimsAndMeasures = new byte[len];
-    inputStream.readFully(noSortDimsAndMeasures);
-
-    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write intermediate sort temp row to OutputStream
-   * This method is used during the merge sort phase to write row to sort temp file.
-   *
-   * @param sortTempRow intermediate sort temp row
-   * @param outputStream output stream
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void writeIntermediateSortTempRowToOutputStream(IntermediateSortTempRow sortTempRow,
-      DataOutputStream outputStream) throws IOException {
-    // write dict & sort dim
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      outputStream.writeInt(sortTempRow.getDictSortDims()[idx]);
-    }
-
-    // write no-dict & sort dim
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = sortTempRow.getNoDictSortDims()[idx];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
-    }
-
-    // write packed no-sort dim & measure
-    outputStream.writeInt(sortTempRow.getNoSortDimsAndMeasures().length);
-    outputStream.write(sortTempRow.getNoSortDimsAndMeasures());
-  }
-
-  /**
-   * Write raw row as an intermediate sort temp row to sort temp file.
-   * This method is used in the beginning of the sort phase. Comparing with converting raw row to
-   * intermediate sort temp row and then writing the converted one, Writing raw row directly will
-   * save the intermediate trivial loss.
-   * This method use an array backend buffer to save memory allocation. The buffer will be reused
-   * for all rows (per thread).
-   *
-   * @param row raw row
-   * @param outputStream output stream
-   * @param rowBuffer array backend buffer
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void writeRawRowAsIntermediateSortTempRowToOutputStream(Object[] row,
-      DataOutputStream outputStream, ByteBuffer rowBuffer) throws IOException {
-    // write dict & sort
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      outputStream.writeInt((int) row[this.dictSortDimIdx[idx]]);
-    }
-
-    // write no-dict & sort
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      outputStream.writeShort(bytes.length);
-      outputStream.write(bytes);
-    }
-
-    // pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
-
-    // write no-sort
-    outputStream.writeInt(packSize);
-    outputStream.write(rowBuffer.array(), 0, packSize);
-  }
-
-  /**
-   * Read intermediate sort temp row from unsafe memory.
-   * This method is used during merge sort phase for off-heap sort.
-   *
-   * @param baseObject base object of memory block
-   * @param address address of the row
-   * @return intermediate sort temp row
-   */
-  public IntermediateSortTempRow readIntermediateSortTempRowFromUnsafeMemory(Object baseObject,
-      long address) {
-    int size = 0;
-
-    int[] dictSortDims = new int[this.dictSortDimCnt];
-    byte[][] noDictSortDims = new byte[this.noDictSortDimCnt][];
-
-    // read dict & sort dim
-    for (int idx = 0; idx < dictSortDims.length; idx++) {
-      dictSortDims[idx] = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-      size += 4;
-    }
-
-    // read no-dict & sort dim
-    for (int idx = 0; idx < noDictSortDims.length; idx++) {
-      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-      noDictSortDims[idx] = bytes;
-    }
-
-    // read no-sort dims & measures
-    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-    size += 4;
-    byte[] noSortDimsAndMeasures = new byte[len];
-    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-
-    return new IntermediateSortTempRow(dictSortDims, noDictSortDims, noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write intermediate sort temp row directly from unsafe memory to stream.
-   * This method is used at the late beginning of the sort phase to write in-memory pages
-   * to sort temp file. Comparing with reading intermediate sort temp row from memory and then
-   * writing it, Writing directly from memory to stream will save the intermediate trivial loss.
-   *
-   * @param baseObject base object of the memory block
-   * @param address base address of the row
-   * @param outputStream output stream
-   * @throws IOException if error occurs while writing to stream
-   */
-  public void writeIntermediateSortTempRowFromUnsafeMemoryToStream(Object baseObject,
-      long address, DataOutputStream outputStream) throws IOException {
-    int size = 0;
-
-    // dict & sort
-    for (int idx = 0; idx < dictSortDimCnt; idx++) {
-      outputStream.writeInt(CarbonUnsafe.getUnsafe().getInt(baseObject, address + size));
-      size += 4;
-    }
-
-    // no-dict & sort
-    for (int idx = 0; idx < noDictSortDimCnt; idx++) {
-      short length = CarbonUnsafe.getUnsafe().getShort(baseObject, address + size);
-      size += 2;
-      byte[] bytes = new byte[length];
-      CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-          bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, length);
-      size += length;
-
-      outputStream.writeShort(length);
-      outputStream.write(bytes);
-    }
-
-    // packed no-sort & measure
-    int len = CarbonUnsafe.getUnsafe().getInt(baseObject, address + size);
-    size += 4;
-    byte[] noSortDimsAndMeasures = new byte[len];
-    CarbonUnsafe.getUnsafe().copyMemory(baseObject, address + size,
-        noSortDimsAndMeasures, CarbonUnsafe.BYTE_ARRAY_OFFSET, len);
-    size += len;
-
-    outputStream.writeInt(len);
-    outputStream.write(noSortDimsAndMeasures);
-  }
-
-  /**
-   * Write raw row as an intermediate sort temp row to memory.
-   * This method is used in the beginning of the off-heap sort phase. Comparing with converting
-   * raw row to intermediate sort temp row and then writing the converted one,
-   * Writing raw row directly will save the intermediate trivial loss.
-   * This method use an array backend buffer to save memory allocation. The buffer will be reused
-   * for all rows (per thread).
-   *
-   * @param row raw row
-   * @param baseObject base object of the memory block
-   * @param address base address for the row
-   * @param rowBuffer array backend buffer
-   * @return number of bytes written to memory
-   */
-  public int writeRawRowAsIntermediateSortTempRowToUnsafeMemory(Object[] row,
-      Object baseObject, long address, ByteBuffer rowBuffer) {
-    int size = 0;
-    // write dict & sort
-    for (int idx = 0; idx < this.dictSortDimCnt; idx++) {
-      CarbonUnsafe.getUnsafe()
-          .putInt(baseObject, address + size, (int) row[this.dictSortDimIdx[idx]]);
-      size += 4;
-    }
-
-    // write no-dict & sort
-    for (int idx = 0; idx < this.noDictSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictSortDimIdx[idx]];
-      CarbonUnsafe.getUnsafe().putShort(baseObject, address + size, (short) bytes.length);
-      size += 2;
-      CarbonUnsafe.getUnsafe()
-          .copyMemory(bytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
-              bytes.length);
-      size += bytes.length;
-    }
-
-    // convert pack no-sort
-    rowBuffer.clear();
-    packNoSortFieldsToBytes(row, rowBuffer);
-    rowBuffer.flip();
-    int packSize = rowBuffer.limit();
-
-    // write no-sort
-    CarbonUnsafe.getUnsafe().putInt(baseObject, address + size, packSize);
-    size += 4;
-    CarbonUnsafe.getUnsafe()
-        .copyMemory(rowBuffer.array(), CarbonUnsafe.BYTE_ARRAY_OFFSET, baseObject, address + size,
-            packSize);
-    size += packSize;
-    return size;
-  }
-
-  /**
-   * Pack to no-sort fields to byte array
-   *
-   * @param row raw row
-   * @param rowBuffer byte array backend buffer
-   */
-  private void packNoSortFieldsToBytes(Object[] row, ByteBuffer rowBuffer) {
-    // convert dict & no-sort
-    for (int idx = 0; idx < this.dictNoSortDimCnt; idx++) {
-      rowBuffer.putInt((int) row[this.dictNoSortDimIdx[idx]]);
-    }
-    // convert no-dict & no-sort
-    for (int idx = 0; idx < this.noDictNoSortDimCnt; idx++) {
-      byte[] bytes = (byte[]) row[this.noDictNoSortDimIdx[idx]];
-      rowBuffer.putShort((short) bytes.length);
-      rowBuffer.put(bytes);
-    }
-
-    // convert measure
-    Object tmpValue;
-    DataType tmpDataType;
-    for (int idx = 0; idx < this.measureCnt; idx++) {
-      tmpValue = row[this.measureIdx[idx]];
-      tmpDataType = this.dataTypes[idx];
-      if (null == tmpValue) {
-        rowBuffer.put((byte) 0);
-        continue;
-      }
-      rowBuffer.put((byte) 1);
-      if (DataTypes.BOOLEAN == tmpDataType) {
-        if ((boolean) tmpValue) {
-          rowBuffer.put((byte) 1);
-        } else {
-          rowBuffer.put((byte) 0);
-        }
-      } else if (DataTypes.SHORT == tmpDataType) {
-        rowBuffer.putShort((Short) tmpValue);
-      } else if (DataTypes.INT == tmpDataType) {
-        rowBuffer.putInt((Integer) tmpValue);
-      } else if (DataTypes.LONG == tmpDataType) {
-        rowBuffer.putLong((Long) tmpValue);
-      } else if (DataTypes.DOUBLE == tmpDataType) {
-        rowBuffer.putDouble((Double) tmpValue);
-      } else if (DataTypes.isDecimal(tmpDataType)) {
-        byte[] decimalBytes = DataTypeUtil.bigDecimalToByte((BigDecimal) tmpValue);
-        rowBuffer.putShort((short) decimalBytes.length);
-        rowBuffer.put(decimalBytes);
-      } else {
-        throw new IllegalArgumentException("Unsupported data type: " + tmpDataType);
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8d8b589e/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
new file mode 100644
index 0000000..c4e4756
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/SortStepRowUtil.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.loading.sort;
+
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.sortdata.SortParameters;
+
+public class SortStepRowUtil {
+  private int measureCount;
+  private int dimensionCount;
+  private int complexDimensionCount;
+  private int noDictionaryCount;
+  private int[] dictDimIdx;
+  private int[] nonDictIdx;
+  private int[] measureIdx;
+
+  public SortStepRowUtil(SortParameters parameters) {
+    this.measureCount = parameters.getMeasureColCount();
+    this.dimensionCount = parameters.getDimColCount();
+    this.complexDimensionCount = parameters.getComplexDimColCount();
+    this.noDictionaryCount = parameters.getNoDictionaryCount();
+    boolean[] isNoDictionaryDimensionColumn = parameters.getNoDictionaryDimnesionColumn();
+
+    int index = 0;
+    int nonDicIndex = 0;
+    int allCount = 0;
+
+    // be careful that the default value is 0
+    this.dictDimIdx = new int[dimensionCount - noDictionaryCount];
+    this.nonDictIdx = new int[noDictionaryCount + complexDimensionCount];
+    this.measureIdx = new int[measureCount];
+
+    // indices for dict dim columns
+    for (int i = 0; i < isNoDictionaryDimensionColumn.length; i++) {
+      if (isNoDictionaryDimensionColumn[i]) {
+        nonDictIdx[nonDicIndex++] = i;
+      } else {
+        dictDimIdx[index++] = allCount;
+      }
+      allCount++;
+    }
+
+    // indices for non dict dim/complex columns
+    for (int i = 0; i < complexDimensionCount; i++) {
+      nonDictIdx[nonDicIndex++] = allCount;
+      allCount++;
+    }
+
+    // indices for measure columns
+    for (int i = 0; i < measureCount; i++) {
+      measureIdx[i] = allCount;
+      allCount++;
+    }
+  }
+
+  public Object[] convertRow(Object[] data) {
+    // create new row of size 3 (1 for dims , 1 for high card , 1 for measures)
+    Object[] holder = new Object[3];
+    try {
+
+      int[] dictDims = new int[dimensionCount - noDictionaryCount];
+      byte[][] nonDictArray = new byte[noDictionaryCount + complexDimensionCount][];
+      Object[] measures = new Object[measureCount];
+
+      // write dict dim data
+      for (int idx = 0; idx < dictDimIdx.length; idx++) {
+        dictDims[idx] = (int) data[dictDimIdx[idx]];
+      }
+
+      // write non dict dim data
+      for (int idx = 0; idx < nonDictIdx.length; idx++) {
+        nonDictArray[idx] = (byte[]) data[nonDictIdx[idx]];
+      }
+
+      // write measure data
+      for (int idx = 0; idx < measureIdx.length; idx++) {
+        measures[idx] = data[measureIdx[idx]];
+      }
+      NonDictionaryUtil.prepareOutObj(holder, dictDims, nonDictArray, measures);
+
+      // increment number if record read
+    } catch (Exception e) {
+      throw new RuntimeException("Problem while converting row ", e);
+    }
+    //return out row
+    return holder;
+  }
+}