You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/10 03:08:14 UTC

[27/50] [abbrv] carbondata git commit: [CARBONDATA-1530] Clean up carbon-processing module

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
deleted file mode 100644
index c7af420..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ /dev/null
@@ -1,199 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-/**
- * It reads data from sorted files which are generated in previous sort step.
- * And it writes data to carbondata file. It also generates mdk key while writing to carbondata file
- */
-public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(DataWriterProcessorStepImpl.class.getName());
-
-  private long readCounter;
-
-  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-  }
-
-  public DataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration) {
-    super(configuration, null);
-  }
-
-  @Override public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override public void initialize() throws IOException {
-    super.initialize();
-    child.initialize();
-  }
-
-  private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) {
-    String[] storeLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(),
-            tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId,
-            configuration.getSegmentId() + "", false, false);
-    CarbonDataProcessorUtil.createLocations(storeLocation);
-    return storeLocation;
-  }
-
-  public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) {
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId));
-    CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-        .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, 0);
-    return model;
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
-    Iterator<CarbonRowBatch>[] iterators = child.execute();
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String tableName = tableIdentifier.getTableName();
-    try {
-      CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-          .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
-              System.currentTimeMillis());
-      int i = 0;
-      for (Iterator<CarbonRowBatch> iterator : iterators) {
-        String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i));
-
-        CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel
-            .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0);
-        CarbonFactHandler dataHandler = null;
-        boolean rowsNotExist = true;
-        while (iterator.hasNext()) {
-          if (rowsNotExist) {
-            rowsNotExist = false;
-            dataHandler = CarbonFactHandlerFactory
-                .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
-            dataHandler.initialise();
-          }
-          processBatch(iterator.next(), dataHandler);
-        }
-        if (!rowsNotExist) {
-          finish(dataHandler);
-        }
-        i++;
-      }
-
-    } catch (CarbonDataWriterException e) {
-      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
-      throw new CarbonDataLoadingException(
-          "Error while initializing data handler : " + e.getMessage());
-    } catch (Exception e) {
-      LOGGER.error(e, "Failed for table: " + tableName + " in DataWriterProcessorStepImpl");
-      throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e);
-    }
-    return null;
-  }
-
-  @Override protected String getStepName() {
-    return "Data Writer";
-  }
-
-  public void finish(CarbonFactHandler dataHandler) {
-    CarbonTableIdentifier tableIdentifier =
-        configuration.getTableIdentifier().getCarbonTableIdentifier();
-    String tableName = tableIdentifier.getTableName();
-
-    try {
-      dataHandler.finish();
-    } catch (Exception e) {
-      LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data handler");
-    }
-    LOGGER.info("Record Processed For table: " + tableName);
-    String logMessage =
-        "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: "
-            + rowCounter.get();
-    LOGGER.info(logMessage);
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
-    processingComplete(dataHandler);
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
-            System.currentTimeMillis());
-    CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
-        .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis());
-  }
-
-  private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException {
-    if (null != dataHandler) {
-      try {
-        dataHandler.closeHandler();
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error(e, e.getMessage());
-        throw new CarbonDataLoadingException(e.getMessage(), e);
-      } catch (Exception e) {
-        LOGGER.error(e, e.getMessage());
-        throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage());
-      }
-    }
-  }
-
-  private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler)
-      throws CarbonDataLoadingException {
-    try {
-      while (batch.hasNext()) {
-        CarbonRow row = batch.next();
-        dataHandler.addDataToStore(row);
-        readCounter++;
-      }
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException(e);
-    }
-    rowCounter.getAndAdd(batch.getSize());
-  }
-
-  public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException {
-    try {
-      readCounter++;
-      dataHandler.addDataToStore(row);
-    } catch (Exception e) {
-      throw new CarbonDataLoadingException("unable to generate the mdkey", e);
-    }
-    rowCounter.getAndAdd(1);
-  }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
deleted file mode 100644
index cbeb20a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ /dev/null
@@ -1,244 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.parser.RowParser;
-import org.apache.carbondata.processing.newflow.parser.impl.RowParserImpl;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-
-/**
- * It reads data from record reader and sends data to next step.
- */
-public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private RowParser rowParser;
-
-  private CarbonIterator<Object[]>[] inputIterators;
-
-  /**
-   * executor service to execute the query
-   */
-  public ExecutorService executorService;
-
-  public InputProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      CarbonIterator<Object[]>[] inputIterators) {
-    super(configuration, null);
-    this.inputIterators = inputIterators;
-  }
-
-  @Override public DataField[] getOutput() {
-    return configuration.getDataFields();
-  }
-
-  @Override public void initialize() throws IOException {
-    super.initialize();
-    rowParser = new RowParserImpl(getOutput(), configuration);
-    executorService = Executors.newCachedThreadPool();
-  }
-
-  @Override public Iterator<CarbonRowBatch>[] execute() {
-    int batchSize = CarbonProperties.getInstance().getBatchSize();
-    List<CarbonIterator<Object[]>>[] readerIterators = partitionInputReaderIterators();
-    Iterator<CarbonRowBatch>[] outIterators = new Iterator[readerIterators.length];
-    for (int i = 0; i < outIterators.length; i++) {
-      outIterators[i] =
-          new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
-              configuration.isPreFetch(), executorService, rowCounter);
-    }
-    return outIterators;
-  }
-
-  /**
-   * Partition input iterators equally as per the number of threads.
-   * @return
-   */
-  private List<CarbonIterator<Object[]>>[] partitionInputReaderIterators() {
-    // Get the number of cores configured in property.
-    int numberOfCores = CarbonProperties.getInstance().getNumberOfCores();
-    // Get the minimum of number of cores and iterators size to get the number of parallel threads
-    // to be launched.
-    int parallelThreadNumber = Math.min(inputIterators.length, numberOfCores);
-
-    List<CarbonIterator<Object[]>>[] iterators = new List[parallelThreadNumber];
-    for (int i = 0; i < parallelThreadNumber; i++) {
-      iterators[i] = new ArrayList<>();
-    }
-    // Equally partition the iterators as per number of threads
-    for (int i = 0; i < inputIterators.length; i++) {
-      iterators[i % parallelThreadNumber].add(inputIterators[i]);
-    }
-    return iterators;
-  }
-
-  @Override protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-  @Override public void close() {
-    if (!closed) {
-      super.close();
-      executorService.shutdown();
-      for (CarbonIterator inputIterator : inputIterators) {
-        inputIterator.close();
-      }
-    }
-  }
-
-  @Override protected String getStepName() {
-    return "Input Processor";
-  }
-
-  /**
-   * This iterator wraps the list of iterators and it starts iterating the each
-   * iterator of the list one by one. It also parse the data while iterating it.
-   */
-  private static class InputProcessorIterator extends CarbonIterator<CarbonRowBatch> {
-
-    private List<CarbonIterator<Object[]>> inputIterators;
-
-    private CarbonIterator<Object[]> currentIterator;
-
-    private int counter;
-
-    private int batchSize;
-
-    private RowParser rowParser;
-
-    private Future<CarbonRowBatch> future;
-
-    private ExecutorService executorService;
-
-    private boolean nextBatch;
-
-    private boolean firstTime;
-
-    private boolean preFetch;
-
-    private AtomicLong rowCounter;
-
-    public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators,
-        RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService,
-        AtomicLong rowCounter) {
-      this.inputIterators = inputIterators;
-      this.batchSize = batchSize;
-      this.rowParser = rowParser;
-      this.counter = 0;
-      // Get the first iterator from the list.
-      currentIterator = inputIterators.get(counter++);
-      this.executorService = executorService;
-      this.rowCounter = rowCounter;
-      this.preFetch = preFetch;
-      this.nextBatch = false;
-      this.firstTime = true;
-    }
-
-    @Override
-    public boolean hasNext() {
-      return nextBatch || internalHasNext();
-    }
-
-    private boolean internalHasNext() {
-      if (firstTime) {
-        firstTime = false;
-        currentIterator.initialize();
-      }
-      boolean hasNext = currentIterator.hasNext();
-      // If iterator is finished then check for next iterator.
-      if (!hasNext) {
-        currentIterator.close();
-        // Check next iterator is available in the list.
-        if (counter < inputIterators.size()) {
-          // Get the next iterator from the list.
-          currentIterator = inputIterators.get(counter++);
-          // Initialize the new iterator
-          currentIterator.initialize();
-          hasNext = internalHasNext();
-        }
-      }
-      return hasNext;
-    }
-
-    @Override
-    public CarbonRowBatch next() {
-      if (preFetch) {
-        return getCarbonRowBatchWithPreFetch();
-      } else {
-        return getBatch();
-      }
-    }
-
-    private CarbonRowBatch getCarbonRowBatchWithPreFetch() {
-      CarbonRowBatch result = null;
-      if (future == null) {
-        future = getCarbonRowBatch();
-      }
-      try {
-        result = future.get();
-      } catch (InterruptedException e) {
-        throw new RuntimeException(e);
-      } catch (ExecutionException e) {
-        throw new RuntimeException(e);
-      }
-      nextBatch = false;
-      if (hasNext()) {
-        nextBatch = true;
-        future = getCarbonRowBatch();
-      }
-
-      return result;
-    }
-
-    private Future<CarbonRowBatch> getCarbonRowBatch() {
-      return executorService.submit(new Callable<CarbonRowBatch>() {
-        @Override public CarbonRowBatch call() throws Exception {
-          return getBatch();
-
-        }
-      });
-    }
-
-    private CarbonRowBatch getBatch() {
-      // Create batch and fill it.
-      CarbonRowBatch carbonRowBatch = new CarbonRowBatch(batchSize);
-      int count = 0;
-      while (internalHasNext() && count < batchSize) {
-        carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next())));
-        count++;
-      }
-      rowCounter.getAndAdd(carbonRowBatch.getSize());
-      return carbonRowBatch;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
deleted file mode 100644
index 0b93b7a..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.processing.newflow.steps;
-
-import java.io.IOException;
-import java.util.Iterator;
-
-import org.apache.carbondata.core.datastore.row.CarbonRow;
-import org.apache.carbondata.processing.newflow.AbstractDataLoadProcessorStep;
-import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
-import org.apache.carbondata.processing.newflow.DataField;
-import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
-import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
-import org.apache.carbondata.processing.newflow.sort.Sorter;
-import org.apache.carbondata.processing.newflow.sort.SorterFactory;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-
-/**
- * It sorts the data and write them to intermediate temp files. These files will be further read
- * by next step for writing to carbondata files.
- */
-public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep {
-
-  private Sorter sorter;
-
-  public SortProcessorStepImpl(CarbonDataLoadConfiguration configuration,
-      AbstractDataLoadProcessorStep child) {
-    super(configuration, child);
-  }
-
-  @Override
-  public DataField[] getOutput() {
-    return child.getOutput();
-  }
-
-  @Override
-  public void initialize() throws IOException {
-    super.initialize();
-    child.initialize();
-    SortParameters sortParameters = SortParameters.createSortParameters(configuration);
-    sorter = SorterFactory.createSorter(configuration, rowCounter);
-    sorter.initialize(sortParameters);
-  }
-
-  @Override
-  public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException {
-    final Iterator<CarbonRowBatch>[] iterators = child.execute();
-    return sorter.sort(iterators);
-  }
-
-  @Override
-  protected CarbonRow processRow(CarbonRow row) {
-    return null;
-  }
-
-  @Override
-  public void close() {
-    if (!closed) {
-      super.close();
-      if (sorter != null) {
-        sorter.close();
-      }
-    }
-  }
-
-  @Override protected String getStepName() {
-    return "Sort Processor";
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java b/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
new file mode 100644
index 0000000..649c18d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/DataPartitioner.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition;
+
+import java.util.List;
+
+public interface DataPartitioner {
+
+  /**
+   * All the partitions built by the Partitioner
+   */
+  List<Partition> getAllPartitions();
+
+  /**
+   * Identifies the partitions applicable for the given filter (API used for For query)
+   */
+  List<Partition> getPartitions();
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java b/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
new file mode 100644
index 0000000..95cdfb3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/Partition.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Partition extends Serializable {
+  /**
+   * unique identification for the partition in the cluster.
+   */
+  String getUniqueID();
+
+  /**
+   * result
+   *
+   * @return
+   */
+  List<String> getFilesPath();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
new file mode 100644
index 0000000..e533baf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/DefaultLoadBalancer.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.processing.partition.Partition;
+
+/**
+ * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
+ */
+public class DefaultLoadBalancer {
+  private Map<String, List<Partition>> nodeToPartitonMap =
+      new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private Map<Partition, String> partitonToNodeMap =
+      new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
+    //Per form a round robin allocation
+    int nodeCount = nodes.size();
+
+    int partitioner = 0;
+    for (Partition partition : partitions) {
+      int nodeindex = partitioner % nodeCount;
+      String node = nodes.get(nodeindex);
+
+      List<Partition> oldList = nodeToPartitonMap.get(node);
+      if (oldList == null) {
+        oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+        nodeToPartitonMap.put(node, oldList);
+      }
+      oldList.add(partition);
+
+      partitonToNodeMap.put(partition, node);
+
+      partitioner++;
+    }
+  }
+
+  public String getNodeForPartitions(Partition partition) {
+    return partitonToNodeMap.get(partition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
new file mode 100644
index 0000000..c303efa
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/PartitionMultiFileImpl.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.partition.Partition;
+
+public class PartitionMultiFileImpl implements Partition {
+  private static final long serialVersionUID = -4363447826181193976L;
+  private String uniqueID;
+  private List<String> folderPath;
+
+  public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
+    this.uniqueID = uniqueID;
+    this.folderPath = folderPath;
+  }
+
+  @Override public String getUniqueID() {
+    // TODO Auto-generated method stub
+    return uniqueID;
+  }
+
+  @Override public List<String> getFilesPath() {
+    // TODO Auto-generated method stub
+    return folderPath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
new file mode 100644
index 0000000..4fb2414
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/QueryPartitionHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.processing.partition.DataPartitioner;
+import org.apache.carbondata.processing.partition.Partition;
+
+
+public final class QueryPartitionHelper {
+  private static QueryPartitionHelper instance = new QueryPartitionHelper();
+  private Map<String, DataPartitioner> partitionerMap =
+      new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private Map<String, DefaultLoadBalancer> loadBalancerMap =
+      new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private QueryPartitionHelper() {
+
+  }
+
+  public static QueryPartitionHelper getInstance() {
+    return instance;
+  }
+
+  /**
+   * Get partitions applicable for query based on filters applied in query
+   */
+  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
+    String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
+
+    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+    return dataPartitioner.getPartitions();
+  }
+
+  public List<Partition> getAllPartitions(String databaseName, String tableName) {
+    String tableUniqueName = databaseName + '_' + tableName;
+
+    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+    return dataPartitioner.getAllPartitions();
+  }
+
+  /**
+   * Get the node name where the partition is assigned to.
+   */
+  public String getLocation(Partition partition, String databaseName, String tableName) {
+    String tableUniqueName = databaseName + '_' + tableName;
+
+    DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
+    return loadBalancer.getNodeForPartitions(partition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
new file mode 100644
index 0000000..92bd6ff
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/impl/SampleDataPartitionerImpl.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.processing.partition.DataPartitioner;
+import org.apache.carbondata.processing.partition.Partition;
+
+/**
+ * Sample partition.
+ */
+public class SampleDataPartitionerImpl implements DataPartitioner {
+
+  @Override
+  public List<Partition> getAllPartitions() {
+    return null;
+  }
+
+  @Override
+  public List<Partition> getPartitions() {
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
new file mode 100644
index 0000000..aeddac6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/AbstractCarbonQueryExecutor.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+public abstract class AbstractCarbonQueryExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(AbstractCarbonQueryExecutor.class.getName());
+  protected CarbonTable carbonTable;
+  protected QueryModel queryModel;
+  protected QueryExecutor queryExecutor;
+  protected Map<String, TaskBlockInfo> segmentMapping;
+
+  /**
+   * get executor and execute the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  protected CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+      throws QueryExecutionException, IOException {
+    queryModel.setTableBlockInfos(blockList);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    return queryExecutor.execute(queryModel);
+  }
+
+  /**
+   * Preparing of the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  protected QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+    QueryModel model = new QueryModel();
+    model.setTableBlockInfos(blockList);
+    model.setForcedDetailRawQuery(true);
+    model.setFilterExpressionResolverTree(null);
+
+    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dim : dimensions) {
+      // check if dimension is deleted
+      QueryDimension queryDimension = new QueryDimension(dim.getColName());
+      queryDimension.setDimension(dim);
+      dims.add(queryDimension);
+    }
+    model.setQueryDimension(dims);
+
+    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    for (CarbonMeasure carbonMeasure : measures) {
+      // check if measure is deleted
+      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+      queryMeasure.setMeasure(carbonMeasure);
+      msrs.add(queryMeasure);
+    }
+    model.setQueryMeasures(msrs);
+    model.setQueryId(System.nanoTime() + "");
+    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+    model.setTable(carbonTable);
+    return model;
+  }
+
+  /**
+   * Below method will be used
+   * for cleanup
+   */
+  public void finish() {
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e, "Problem while finish: ");
+    }
+    clearDictionaryFromQueryModel();
+  }
+
+  /**
+   * This method will clear the dictionary access count after its usage is complete so
+   * that column can be deleted form LRU cache whenever memory reaches threshold
+   */
+  private void clearDictionaryFromQueryModel() {
+    if (null != queryModel) {
+      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+      if (null != columnToDictionaryMapping) {
+        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+          CarbonUtil.clearDictionaryCache(entry.getValue());
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
new file mode 100644
index 0000000..6afec0b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/CarbonSplitExecutor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.result.iterator.PartitionSpliterRawResultIterator;
+
+/**
+ * Used to read carbon blocks when add/split partition
+ */
+public class CarbonSplitExecutor extends AbstractCarbonQueryExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonSplitExecutor.class.getName());
+
+  public CarbonSplitExecutor(Map<String, TaskBlockInfo> segmentMapping, CarbonTable carbonTable) {
+    this.segmentMapping = segmentMapping;
+    this.carbonTable = carbonTable;
+  }
+
+  public List<PartitionSpliterRawResultIterator> processDataBlocks(String segmentId)
+      throws QueryExecutionException, IOException {
+    List<TableBlockInfo> list = null;
+    queryModel = prepareQueryModel(list);
+    List<PartitionSpliterRawResultIterator> resultList
+        = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    TaskBlockInfo taskBlockInfo = segmentMapping.get(segmentId);
+    Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+    for (String task : taskBlockListMapping) {
+      list = taskBlockInfo.getTableBlockInfoList(task);
+      LOGGER.info("for task -" + task + "-block size is -" + list.size());
+      queryModel.setTableBlockInfos(list);
+      resultList.add(new PartitionSpliterRawResultIterator(executeBlockList(list)));
+    }
+    return resultList;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
new file mode 100644
index 0000000..1db414f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.partition.spliter;
+
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.row.CarbonRow;
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
+import org.apache.carbondata.processing.partition.spliter.exception.AlterPartitionSliceException;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+public class RowResultProcessor {
+
+  private CarbonFactHandler dataHandler;
+  private SegmentProperties segmentProperties;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowResultProcessor.class.getName());
+
+
+  public RowResultProcessor(CarbonTable carbonTable, CarbonLoadModel loadModel,
+      SegmentProperties segProp, String[] tempStoreLocation, Integer bucketId) {
+    CarbonDataProcessorUtil.createLocations(tempStoreLocation);
+    this.segmentProperties = segProp;
+    String tableName = carbonTable.getFactTableName();
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+        CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable,
+            segProp, tableName, tempStoreLocation);
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+            loadModel.getFactTimeStamp());
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    carbonFactDataHandlerModel.setBucketId(bucketId);
+    //Note: set compaction flow just to convert decimal type
+    carbonFactDataHandlerModel.setCompactionFlow(true);
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+  }
+
+  public boolean execute(List<Object[]> resultList) {
+    boolean processStatus;
+    boolean isDataPresent = false;
+
+    try {
+      if (!isDataPresent) {
+        dataHandler.initialise();
+        isDataPresent = true;
+      }
+      for (Object[] row: resultList) {
+        addRow(row);
+      }
+      if (isDataPresent)
+      {
+        this.dataHandler.finish();
+      }
+      processStatus = true;
+    } catch (AlterPartitionSliceException e) {
+      LOGGER.error(e, e.getMessage());
+      LOGGER.error("Exception in executing RowResultProcessor" + e.getMessage());
+      processStatus = false;
+    } finally {
+      try {
+        if (isDataPresent) {
+          this.dataHandler.closeHandler();
+        }
+      } catch (Exception e) {
+        LOGGER.error("Exception while closing the handler in RowResultProcessor" + e.getMessage());
+        processStatus = false;
+      }
+    }
+    return processStatus;
+  }
+
+  private void addRow(Object[] carbonTuple) throws AlterPartitionSliceException {
+    CarbonRow row = WriteStepRowUtil.fromMergerRow(carbonTuple, segmentProperties);
+    try {
+      this.dataHandler.addDataToStore(row);
+    } catch (CarbonDataWriterException e) {
+      throw new AlterPartitionSliceException("Exception in adding rows in RowResultProcessor", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
new file mode 100644
index 0000000..21b53cf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/exception/AlterPartitionSliceException.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.partition.spliter.exception;
+
+import java.util.Locale;
+
+public class AlterPartitionSliceException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public AlterPartitionSliceException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public AlterPartitionSliceException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java b/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
deleted file mode 100644
index 31c2b4f..0000000
--- a/processing/src/main/java/org/apache/carbondata/processing/schema/metadata/SortObserver.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.processing.schema.metadata;
-
-import java.io.Serializable;
-
-public class SortObserver implements Serializable {
-  /**
-   * is failed
-   */
-  private boolean isFailed;
-
-  /**
-   * @return the isFailed
-   */
-  public boolean isFailed() {
-    return isFailed;
-  }
-
-  /**
-   * @param isFailed the isFailed to set
-   */
-  public void setFailed(boolean isFailed) {
-    this.isFailed = isFailed;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java b/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
new file mode 100644
index 0000000..292cdb3
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/exception/CarbonSortKeyAndGroupByException.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.exception;
+
+import java.util.Locale;
+
+public class CarbonSortKeyAndGroupByException extends Exception {
+
+  /**
+   * default serial version ID.
+   */
+  private static final long serialVersionUID = 1L;
+
+  /**
+   * The Error message.
+   */
+  private String msg = "";
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonSortKeyAndGroupByException(String msg) {
+    super(msg);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param msg The error message for this exception.
+   */
+  public CarbonSortKeyAndGroupByException(String msg, Throwable t) {
+    super(msg, t);
+    this.msg = msg;
+  }
+
+  /**
+   * Constructor
+   *
+   * @param t
+   */
+  public CarbonSortKeyAndGroupByException(Throwable t) {
+    super(t);
+  }
+
+  /**
+   * This method is used to get the localized message.
+   *
+   * @param locale - A Locale object represents a specific geographical,
+   *               political, or cultural region.
+   * @return - Localized error message.
+   */
+  public String getLocalizedMessage(Locale locale) {
+    return "";
+  }
+
+  /**
+   * getLocalizedMessage
+   */
+  @Override public String getLocalizedMessage() {
+    return super.getLocalizedMessage();
+  }
+
+  /**
+   * getMessage
+   */
+  public String getMessage() {
+    return this.msg;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
new file mode 100644
index 0000000..1302a5b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/AbstractTempSortFileWriter.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public abstract class AbstractTempSortFileWriter implements TempSortFileWriter {
+
+  /**
+   * writeFileBufferSize
+   */
+  protected int writeBufferSize;
+
+  /**
+   * Measure count
+   */
+  protected int measureCount;
+
+  /**
+   * Measure count
+   */
+  protected int dimensionCount;
+
+  /**
+   * complexDimension count
+   */
+  protected int complexDimensionCount;
+
+  /**
+   * stream
+   */
+  protected DataOutputStream stream;
+
+  /**
+   * noDictionaryCount
+   */
+  protected int noDictionaryCount;
+
+  /**
+   * AbstractTempSortFileWriter
+   *
+   * @param writeBufferSize
+   * @param dimensionCount
+   * @param measureCount
+   */
+  public AbstractTempSortFileWriter(int dimensionCount, int complexDimensionCount, int measureCount,
+      int noDictionaryCount, int writeBufferSize) {
+    this.writeBufferSize = writeBufferSize;
+    this.dimensionCount = dimensionCount;
+    this.complexDimensionCount = complexDimensionCount;
+    this.measureCount = measureCount;
+    this.noDictionaryCount = noDictionaryCount;
+  }
+
+  /**
+   * Below method will be used to initialize the stream and write the entry count
+   */
+  @Override public void initiaize(File file, int entryCount)
+      throws CarbonSortKeyAndGroupByException {
+    try {
+      stream = new DataOutputStream(
+          new BufferedOutputStream(new FileOutputStream(file), writeBufferSize));
+      stream.writeInt(entryCount);
+    } catch (FileNotFoundException e1) {
+      throw new CarbonSortKeyAndGroupByException(e1);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException(e);
+    }
+  }
+
+  /**
+   * Below method will be used to close the stream
+   */
+  @Override public void finish() {
+    CarbonUtil.closeStreams(stream);
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
new file mode 100644
index 0000000..40f650d
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/CompressedTempSortFileWriter.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.compression.CompressorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class CompressedTempSortFileWriter extends AbstractTempSortFileWriter {
+
+  /**
+   * CompressedTempSortFileWriter
+   *
+   * @param writeBufferSize
+   * @param dimensionCount
+   * @param measureCount
+   */
+  public CompressedTempSortFileWriter(int dimensionCount, int complexDimensionCount,
+      int measureCount, int noDictionaryCount, int writeBufferSize) {
+    super(dimensionCount, complexDimensionCount, measureCount, noDictionaryCount, writeBufferSize);
+  }
+
+  /**
+   * Below method will be used to write the sort temp file
+   *
+   * @param records
+   */
+  public void writeSortTempFile(Object[][] records) throws CarbonSortKeyAndGroupByException {
+    DataOutputStream dataOutputStream = null;
+    ByteArrayOutputStream blockDataArray = null;
+    int totalSize = 0;
+    int recordSize = 0;
+    try {
+      recordSize = (measureCount * CarbonCommonConstants.DOUBLE_SIZE_IN_BYTE) + (dimensionCount
+          * CarbonCommonConstants.INT_SIZE_IN_BYTE);
+      totalSize = records.length * recordSize;
+
+      blockDataArray = new ByteArrayOutputStream(totalSize);
+      dataOutputStream = new DataOutputStream(blockDataArray);
+
+      UnCompressedTempSortFileWriter
+          .writeDataOutputStream(records, dataOutputStream, measureCount, dimensionCount,
+              noDictionaryCount, complexDimensionCount);
+
+      stream.writeInt(records.length);
+      byte[] byteArray = CompressorFactory.getInstance().getCompressor()
+          .compressByte(blockDataArray.toByteArray());
+      stream.writeInt(byteArray.length);
+      stream.write(byteArray);
+
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException(e);
+    } finally {
+      CarbonUtil.closeStreams(blockDataArray);
+      CarbonUtil.closeStreams(dataOutputStream);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
new file mode 100644
index 0000000..ffe6fb6
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/IntermediateFileMerger.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.io.BufferedOutputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.AbstractQueue;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+import org.apache.carbondata.processing.sort.exception.CarbonSortKeyAndGroupByException;
+
+public class IntermediateFileMerger implements Runnable {
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(IntermediateFileMerger.class.getName());
+
+  /**
+   * recordHolderHeap
+   */
+  private AbstractQueue<SortTempFileChunkHolder> recordHolderHeap;
+
+  /**
+   * fileCounter
+   */
+  private int fileCounter;
+
+  /**
+   * stream
+   */
+  private DataOutputStream stream;
+
+  /**
+   * totalNumberOfRecords
+   */
+  private int totalNumberOfRecords;
+
+  /**
+   * records
+   */
+  private Object[][] records;
+
+  /**
+   * entryCount
+   */
+  private int entryCount;
+
+  /**
+   * writer
+   */
+  private TempSortFileWriter writer;
+
+  /**
+   * totalSize
+   */
+  private int totalSize;
+
+  private SortParameters mergerParameters;
+
+  private File[] intermediateFiles;
+
+  private File outPutFile;
+
+  private boolean[] noDictionarycolumnMapping;
+
+  /**
+   * IntermediateFileMerger Constructor
+   */
+  public IntermediateFileMerger(SortParameters mergerParameters, File[] intermediateFiles,
+      File outPutFile) {
+    this.mergerParameters = mergerParameters;
+    this.fileCounter = intermediateFiles.length;
+    this.intermediateFiles = intermediateFiles;
+    this.outPutFile = outPutFile;
+    noDictionarycolumnMapping = mergerParameters.getNoDictionaryDimnesionColumn();
+  }
+
+  @Override
+  public void run() {
+    long intermediateMergeStartTime = System.currentTimeMillis();
+    int fileConterConst = fileCounter;
+    boolean isFailed = false;
+    try {
+      startSorting();
+      initialize();
+      while (hasNext()) {
+        writeDataTofile(next());
+      }
+      if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+        if (entryCount > 0) {
+          if (entryCount < totalSize) {
+            Object[][] temp = new Object[entryCount][];
+            System.arraycopy(records, 0, temp, 0, entryCount);
+            records = temp;
+            this.writer.writeSortTempFile(temp);
+          } else {
+            this.writer.writeSortTempFile(records);
+          }
+        }
+      }
+      double intermediateMergeCostTime =
+          (System.currentTimeMillis() - intermediateMergeStartTime) / 1000.0;
+      LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
+          " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
+    } catch (Exception e) {
+      LOGGER.error(e, "Problem while intermediate merging");
+      isFailed = true;
+    } finally {
+      records = null;
+      CarbonUtil.closeStreams(this.stream);
+      if (null != writer) {
+        writer.finish();
+      }
+      if (!isFailed) {
+        try {
+          finish();
+        } catch (CarbonSortKeyAndGroupByException e) {
+          LOGGER.error(e, "Problem while deleting the merge file");
+        }
+      } else {
+        if (outPutFile.delete()) {
+          LOGGER.error("Problem while deleting the merge file");
+        }
+      }
+    }
+  }
+
+  /**
+   * This method is responsible for initializing the out stream
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void initialize() throws CarbonSortKeyAndGroupByException {
+    if (!mergerParameters.isSortFileCompressionEnabled() && !mergerParameters.isPrefetch()) {
+      try {
+        this.stream = new DataOutputStream(
+            new BufferedOutputStream(new FileOutputStream(outPutFile),
+                mergerParameters.getFileWriteBufferSize()));
+        this.stream.writeInt(this.totalNumberOfRecords);
+      } catch (FileNotFoundException e) {
+        throw new CarbonSortKeyAndGroupByException("Problem while getting the file", e);
+      } catch (IOException e) {
+        throw new CarbonSortKeyAndGroupByException("Problem while writing the data to file", e);
+      }
+    } else {
+      writer = TempSortFileWriterFactory.getInstance()
+          .getTempSortFileWriter(mergerParameters.isSortFileCompressionEnabled(),
+              mergerParameters.getDimColCount(), mergerParameters.getComplexDimColCount(),
+              mergerParameters.getMeasureColCount(), mergerParameters.getNoDictionaryCount(),
+              mergerParameters.getFileWriteBufferSize());
+      writer.initiaize(outPutFile, totalNumberOfRecords);
+
+      if (mergerParameters.isPrefetch()) {
+        totalSize = mergerParameters.getBufferSize();
+      } else {
+        totalSize = mergerParameters.getSortTempFileNoOFRecordsInCompression();
+      }
+    }
+  }
+
+  /**
+   * This method will be used to get the sorted record from file
+   *
+   * @return sorted record sorted record
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] getSortedRecordFromFile() throws CarbonSortKeyAndGroupByException {
+    Object[] row = null;
+
+    // poll the top object from heap
+    // heap maintains binary tree which is based on heap condition that will
+    // be based on comparator we are passing the heap
+    // when will call poll it will always delete root of the tree and then
+    // it does trickel down operation complexity is log(n)
+    SortTempFileChunkHolder poll = this.recordHolderHeap.poll();
+
+    // get the row from chunk
+    row = poll.getRow();
+
+    // check if there no entry present
+    if (!poll.hasNext()) {
+      // if chunk is empty then close the stream
+      poll.closeStream();
+
+      // change the file counter
+      --this.fileCounter;
+
+      // reaturn row
+      return row;
+    }
+
+    // read new row
+    poll.readRow();
+
+    // add to heap
+    this.recordHolderHeap.add(poll);
+
+    // return row
+    return row;
+  }
+
+  /**
+   * Below method will be used to start storing process This method will get
+   * all the temp files present in sort temp folder then it will create the
+   * record holder heap and then it will read first record from each file and
+   * initialize the heap
+   *
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private void startSorting() throws CarbonSortKeyAndGroupByException {
+    LOGGER.info("Number of temp file: " + this.fileCounter);
+
+    // create record holder heap
+    createRecordHolderQueue(intermediateFiles);
+
+    // iterate over file list and create chunk holder and add to heap
+    LOGGER.info("Started adding first record from each file");
+
+    SortTempFileChunkHolder sortTempFileChunkHolder = null;
+
+    for (File tempFile : intermediateFiles) {
+      // create chunk holder
+      sortTempFileChunkHolder =
+          new SortTempFileChunkHolder(tempFile, mergerParameters.getDimColCount(),
+              mergerParameters.getComplexDimColCount(), mergerParameters.getMeasureColCount(),
+              mergerParameters.getFileBufferSize(), mergerParameters.getNoDictionaryCount(),
+              mergerParameters.getMeasureDataType(),
+              mergerParameters.getNoDictionaryDimnesionColumn(),
+              mergerParameters.getNoDictionarySortColumn());
+
+      // initialize
+      sortTempFileChunkHolder.initialize();
+      sortTempFileChunkHolder.readRow();
+      this.totalNumberOfRecords += sortTempFileChunkHolder.getEntryCount();
+
+      // add to heap
+      this.recordHolderHeap.add(sortTempFileChunkHolder);
+    }
+
+    LOGGER.info("Heap Size" + this.recordHolderHeap.size());
+  }
+
+  /**
+   * This method will be used to create the heap which will be used to hold
+   * the chunk of data
+   *
+   * @param listFiles list of temp files
+   */
+  private void createRecordHolderQueue(File[] listFiles) {
+    // creating record holder heap
+    this.recordHolderHeap = new PriorityQueue<>(listFiles.length);
+  }
+
+  /**
+   * This method will be used to get the sorted row
+   *
+   * @return sorted row
+   * @throws CarbonSortKeyAndGroupByException
+   */
+  private Object[] next() throws CarbonSortKeyAndGroupByException {
+    return getSortedRecordFromFile();
+  }
+
+  /**
+   * This method will be used to check whether any more element is present or
+   * not
+   *
+   * @return more element is present
+   */
+  private boolean hasNext() {
+    return this.fileCounter > 0;
+  }
+
+  /**
+   * Below method will be used to write data to file
+   *
+   * @throws CarbonSortKeyAndGroupByException problem while writing
+   */
+  private void writeDataTofile(Object[] row) throws CarbonSortKeyAndGroupByException {
+    if (mergerParameters.isSortFileCompressionEnabled() || mergerParameters.isPrefetch()) {
+      if (entryCount == 0) {
+        records = new Object[totalSize][];
+        records[entryCount++] = row;
+        return;
+      }
+
+      records[entryCount++] = row;
+      if (entryCount == totalSize) {
+        this.writer.writeSortTempFile(records);
+        entryCount = 0;
+        records = new Object[totalSize][];
+      }
+      return;
+    }
+    try {
+      DataType[] aggType = mergerParameters.getMeasureDataType();
+      int[] mdkArray = (int[]) row[0];
+      byte[][] nonDictArray = (byte[][]) row[1];
+      int mdkIndex = 0;
+      int nonDictKeyIndex = 0;
+      // write dictionary and non dictionary dimensions here.
+      for (boolean nodictinary : noDictionarycolumnMapping) {
+        if (nodictinary) {
+          byte[] col = nonDictArray[nonDictKeyIndex++];
+          stream.writeShort(col.length);
+          stream.write(col);
+        } else {
+          stream.writeInt(mdkArray[mdkIndex++]);
+        }
+      }
+
+      int fieldIndex = 0;
+      for (int counter = 0; counter < mergerParameters.getMeasureColCount(); counter++) {
+        if (null != NonDictionaryUtil.getMeasure(fieldIndex, row)) {
+          stream.write((byte) 1);
+          switch (aggType[counter]) {
+            case SHORT:
+              stream.writeShort((short)NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case INT:
+              stream.writeInt((int)NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case LONG:
+              stream.writeLong((long)NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case DOUBLE:
+              stream.writeDouble((Double) NonDictionaryUtil.getMeasure(fieldIndex, row));
+              break;
+            case DECIMAL:
+              byte[] bigDecimalInBytes = (byte[]) NonDictionaryUtil.getMeasure(fieldIndex, row);
+              stream.writeInt(bigDecimalInBytes.length);
+              stream.write(bigDecimalInBytes);
+              break;
+            default:
+              throw new IllegalArgumentException("unsupported data type:" + aggType[counter]);
+          }
+        } else {
+          stream.write((byte) 0);
+        }
+        fieldIndex++;
+      }
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while writing the file", e);
+    }
+  }
+
+  private void finish() throws CarbonSortKeyAndGroupByException {
+    if (recordHolderHeap != null) {
+      int size = recordHolderHeap.size();
+      for (int i = 0; i < size; i++) {
+        recordHolderHeap.poll().closeStream();
+      }
+    }
+    try {
+      CarbonUtil.deleteFiles(intermediateFiles);
+    } catch (IOException e) {
+      throw new CarbonSortKeyAndGroupByException("Problem while deleting the intermediate files");
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
new file mode 100644
index 0000000..d2579d2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparator.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+
+public class NewRowComparator implements Comparator<Object[]> {
+
+  /**
+   * mapping of dictionary dimensions and no dictionary of sort_column.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
+  /**
+   * @param noDictionarySortColumnMaping
+   */
+  public NewRowComparator(boolean[] noDictionarySortColumnMaping) {
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    int index = 0;
+
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+      if (isNoDictionary) {
+        byte[] byteArr1 = (byte[]) rowA[index];
+
+        byte[] byteArr2 = (byte[]) rowB[index];
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(byteArr1, byteArr2);
+        if (difference != 0) {
+          return difference;
+        }
+      } else {
+        int dimFieldA = (int) rowA[index];
+        int dimFieldB = (int) rowB[index];
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+      }
+
+      index++;
+    }
+
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
new file mode 100644
index 0000000..e01b587
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/NewRowComparatorForNormalDims.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.util.Comparator;
+
+/**
+ * This class is used as comparator for comparing dims which are non high cardinality dims.
+ * Here the dims will be in form of int[] (surrogates) so directly comparing the integers.
+ */
+public class NewRowComparatorForNormalDims implements Comparator<Object[]> {
+  /**
+   * dimension count
+   */
+  private int numberOfSortColumns;
+
+  /**
+   * RowComparatorForNormalDims Constructor
+   *
+   * @param numberOfSortColumns
+   */
+  public NewRowComparatorForNormalDims(int numberOfSortColumns) {
+    this.numberOfSortColumns = numberOfSortColumns;
+  }
+
+  /**
+   * Below method will be used to compare two surrogate keys
+   *
+   * @see Comparator#compare(Object, Object)
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    for (int i = 0; i < numberOfSortColumns; i++) {
+
+      int dimFieldA = (int)rowA[i];
+      int dimFieldB = (int)rowB[i];
+      diff = dimFieldA - dimFieldB;
+      if (diff != 0) {
+        return diff;
+      }
+    }
+    return diff;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/349c59c7/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
new file mode 100644
index 0000000..0ae0b93
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/RowComparator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.sort.sortdata;
+
+import java.nio.ByteBuffer;
+import java.util.Comparator;
+
+import org.apache.carbondata.core.datastore.row.WriteStepRowUtil;
+import org.apache.carbondata.core.util.ByteUtil.UnsafeComparer;
+import org.apache.carbondata.core.util.NonDictionaryUtil;
+
+public class RowComparator implements Comparator<Object[]> {
+  /**
+   * noDictionaryCount represent number of no dictionary cols
+   */
+  private int noDictionaryCount;
+
+  /**
+   * noDictionaryColMaping mapping of dictionary dimensions and no dictionary dimensions.
+   */
+  private boolean[] noDictionarySortColumnMaping;
+
+  /**
+   * @param noDictionarySortColumnMaping
+   * @param noDictionaryCount
+   */
+  public RowComparator(boolean[] noDictionarySortColumnMaping, int noDictionaryCount) {
+    this.noDictionaryCount = noDictionaryCount;
+    this.noDictionarySortColumnMaping = noDictionarySortColumnMaping;
+  }
+
+  /**
+   * Below method will be used to compare two mdkey
+   */
+  public int compare(Object[] rowA, Object[] rowB) {
+    int diff = 0;
+
+    int normalIndex = 0;
+    int noDictionaryindex = 0;
+
+    for (boolean isNoDictionary : noDictionarySortColumnMaping) {
+
+      if (isNoDictionary) {
+        byte[] byteArr1 = (byte[]) rowA[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+        ByteBuffer buff1 = ByteBuffer.wrap(byteArr1);
+
+        // extract a high card dims from complete byte[].
+        NonDictionaryUtil
+            .extractSingleHighCardDims(byteArr1, noDictionaryindex, noDictionaryCount, buff1);
+
+        byte[] byteArr2 = (byte[]) rowB[WriteStepRowUtil.NO_DICTIONARY_AND_COMPLEX];
+
+        ByteBuffer buff2 = ByteBuffer.wrap(byteArr2);
+
+        // extract a high card dims from complete byte[].
+        NonDictionaryUtil
+            .extractSingleHighCardDims(byteArr2, noDictionaryindex, noDictionaryCount, buff2);
+
+        int difference = UnsafeComparer.INSTANCE.compareTo(buff1, buff2);
+        if (difference != 0) {
+          return difference;
+        }
+        noDictionaryindex++;
+      } else {
+        int dimFieldA = NonDictionaryUtil.getDimension(normalIndex, rowA);
+        int dimFieldB = NonDictionaryUtil.getDimension(normalIndex, rowB);
+        diff = dimFieldA - dimFieldB;
+        if (diff != 0) {
+          return diff;
+        }
+        normalIndex++;
+      }
+
+    }
+
+    return diff;
+  }
+}