You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/11/30 07:51:50 UTC

[12/14] incubator-carbondata git commit: rebase

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
new file mode 100644
index 0000000..836a757
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -0,0 +1,695 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.lcm.locks.ICarbonLock;
+import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.spark.load.CarbonLoaderUtil;
+
+/**
+ * utility class for load merging.
+ */
+public final class CarbonDataMergerUtil {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
+
+  private CarbonDataMergerUtil() {
+
+  }
+
+  /**
+   * Returns the size of all the carbondata files present in the segment.
+   * @param carbonFile
+   * @return
+   */
+  private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
+    long factSize = 0;
+
+    // carbon data file case.
+    CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile file) {
+        return CarbonTablePath.isCarbonDataFile(file.getName());
+      }
+    });
+
+    for (CarbonFile fact : factFile) {
+      factSize += fact.getSize();
+    }
+
+    return factSize;
+  }
+
+  /**
+   * To check whether the merge property is enabled or not.
+   *
+   * @return
+   */
+
+  public static boolean checkIfAutoLoadMergingRequired() {
+    // load merge is not supported as per new store format
+    // moving the load merge check in early to avoid unnecessary load listing causing IOException
+    // check whether carbons segment merging operation is enabled or not.
+    // default will be false.
+    String isLoadMergeEnabled = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+            CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+    if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Form the Name of the New Merge Folder
+   *
+   * @param segmentsToBeMergedList
+   * @return
+   */
+  public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
+    String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
+    if (firstSegmentName.contains(".")) {
+      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
+      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
+      int fraction = Integer.parseInt(afterDecimal) + 1;
+      String mergedSegmentName = beforeDecimal + "." + fraction;
+      return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
+    } else {
+      String mergeName = firstSegmentName + "." + 1;
+      return CarbonCommonConstants.LOAD_FOLDER + mergeName;
+    }
+
+  }
+
+  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
+      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
+      String mergeLoadStartTime, CompactionType compactionType) {
+
+    boolean tableStatusUpdationStatus = false;
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+            + carbonLoadModel.getTableName() + " for table status updation ");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+
+        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String mergedLoadNumber = MergedLoadName.substring(
+            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+
+        String modificationOrDeletionTimeStamp = CarbonLoaderUtil.readCurrentTime();
+        for (LoadMetadataDetails loadDetail : loadDetails) {
+          // check if this segment is merged.
+          if (loadsToMerge.contains(loadDetail)) {
+            // if the compacted load is deleted after the start of the compaction process,
+            // then need to discard the compaction process and treat it as failed compaction.
+            if (loadDetail.getLoadStatus()
+                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+                  + " is deleted after the compaction is started.");
+              return tableStatusUpdationStatus;
+            }
+            loadDetail.setLoadStatus(CarbonCommonConstants.SEGMENT_COMPACTED);
+            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+            loadDetail.setMergedLoadName(mergedLoadNumber);
+          }
+        }
+
+        // create entry for merged one.
+        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+        String loadEnddate = CarbonLoaderUtil.readCurrentTime();
+        loadMetadataDetails.setTimestamp(loadEnddate);
+        loadMetadataDetails.setLoadName(mergedLoadNumber);
+        loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
+        loadMetadataDetails.setPartitionCount("0");
+        // if this is a major compaction then set the segment as major compaction.
+        if (compactionType == CompactionType.MAJOR_COMPACTION) {
+          loadMetadataDetails.setMajorCompacted("true");
+        }
+
+        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
+
+        // put the merged folder entry
+        updatedDetailsList.add(loadMetadataDetails);
+
+        try {
+          SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+          tableStatusUpdationStatus = true;
+        } catch (IOException e) {
+          LOGGER.error("Error while writing metadata");
+        }
+      } else {
+        LOGGER.error(
+            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + "for table status updation");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + " during table status updation");
+      }
+    }
+    return tableStatusUpdationStatus;
+  }
+
+  /**
+   * To identify which all segments can be merged.
+   *
+   * @param storeLocation
+   * @param carbonLoadModel
+   * @param compactionSize
+   * @return
+   */
+  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+      CarbonLoadModel carbonLoadModel, long compactionSize,
+      List<LoadMetadataDetails> segments, CompactionType compactionType) {
+
+    List sortedSegments = new ArrayList(segments);
+
+    sortSegments(sortedSegments);
+
+    // check preserve property and preserve the configured number of latest loads.
+
+    List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
+        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
+
+    // filter the segments if the compaction based on days is configured.
+
+    List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
+        identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
+    List<LoadMetadataDetails> listOfSegmentsToBeMerged;
+    // identify the segments to merge based on the Size of the segments across partition.
+    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
+
+      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
+          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
+    } else {
+
+      listOfSegmentsToBeMerged =
+          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+    }
+
+    return listOfSegmentsToBeMerged;
+  }
+
+  /**
+   * Sorting of the segments.
+   * @param segments
+   */
+  public static void sortSegments(List segments) {
+    // sort the segment details.
+    Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
+      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+        double seg1Id = Double.parseDouble(seg1.getLoadName());
+        double seg2Id = Double.parseDouble(seg2.getLoadName());
+        if (seg1Id - seg2Id < 0) {
+          return -1;
+        }
+        if (seg1Id - seg2Id > 0) {
+          return 1;
+        }
+        return 0;
+      }
+    });
+  }
+
+  /**
+   * This method will return the list of loads which are loaded at the same interval.
+   * This property is configurable.
+   *
+   * @param listOfSegmentsBelowThresholdSize
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
+      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
+
+    List<LoadMetadataDetails> loadsOfSameDate =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    long numberOfDaysAllowedToMerge = 0;
+    try {
+      numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+              CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
+      if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
+        LOGGER.error(
+            "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
+                + " is incorrect."
+                + " Correct value should be in range of 0 -100. Taking the default value.");
+        numberOfDaysAllowedToMerge =
+            Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+      }
+
+    } catch (NumberFormatException e) {
+      numberOfDaysAllowedToMerge =
+          Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+    }
+    // if true then process loads according to the load date.
+    if (numberOfDaysAllowedToMerge > 0) {
+
+      // filter loads based on the loaded date
+      boolean first = true;
+      Date segDate1 = null;
+      SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+
+        if (first) {
+          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+          first = false;
+          continue;
+        }
+        String segmentDate = segment.getLoadStartTime();
+        Date segDate2 = null;
+        try {
+          segDate2 = sdf.parse(segmentDate);
+        } catch (ParseException e) {
+          LOGGER.error("Error while parsing segment start time" + e.getMessage());
+        }
+
+        if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
+          loadsOfSameDate.add(segment);
+        }
+        // if the load is beyond merged date.
+        // then reset everything and continue search for loads.
+        else if (loadsOfSameDate.size() < 2) {
+          loadsOfSameDate.clear();
+          // need to add the next segment as first and  to check further
+          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+        } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
+          break;
+        }
+      }
+    } else {
+      return listOfSegmentsBelowThresholdSize;
+    }
+
+    return loadsOfSameDate;
+  }
+
+  /**
+   * @param loadsOfSameDate
+   * @param segment
+   * @return
+   */
+  private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
+      LoadMetadataDetails segment, SimpleDateFormat sdf) {
+    String baselineLoadStartTime = segment.getLoadStartTime();
+    Date segDate1 = null;
+    try {
+      segDate1 = sdf.parse(baselineLoadStartTime);
+    } catch (ParseException e) {
+      LOGGER.error("Error while parsing segment start time" + e.getMessage());
+    }
+    loadsOfSameDate.add(segment);
+    return segDate1;
+  }
+
+  /**
+   * Method to check if the load dates are complied to the configured dates.
+   *
+   * @param segDate1
+   * @param segDate2
+   * @return
+   */
+  private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
+      long numberOfDaysAllowedToMerge) {
+    if(segDate1 == null || segDate2 == null) {
+      return false;
+    }
+    // take 1 st date add the configured days .
+    Calendar cal1 = Calendar.getInstance();
+    cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
+    Calendar cal2 = Calendar.getInstance();
+    cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
+
+    long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
+
+    if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Identify the segments to be merged based on the Size in case of Major compaction.
+   *
+   * @param compactionSize
+   * @param listOfSegmentsAfterPreserve
+   * @param carbonLoadModel
+   * @param storeLocation
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
+      long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
+      CarbonLoadModel carbonLoadModel, String storeLocation) {
+
+    List<LoadMetadataDetails> segmentsToBeMerged =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    CarbonTableIdentifier tableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
+
+
+    // total length
+    long totalLength = 0;
+
+    // check size of each segment , sum it up across partitions
+    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+      String segId = segment.getLoadName();
+      // variable to store one  segment size across partition.
+      long sizeOfOneSegmentAcrossPartition =
+          getSizeOfSegment(storeLocation, tableIdentifier, segId);
+
+      // if size of a segment is greater than the Major compaction size. then ignore it.
+      if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
+        // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
+          break;
+        } else { // if only one segment is found then remove the earlier one in list.
+          // reset the total length to 0.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          totalLength = 0;
+          continue;
+        }
+      }
+
+      totalLength += sizeOfOneSegmentAcrossPartition;
+
+      // in case of major compaction the size doesnt matter. all the segments will be merged.
+      if (totalLength < (compactionSize * 1024 * 1024)) {
+        segmentsToBeMerged.add(segment);
+      } else { // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
+          break;
+        } else { // if only one segment is found then remove the earlier one in list and put this.
+          // reset the total length to the current identified segment.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          segmentsToBeMerged.add(segment);
+          totalLength = sizeOfOneSegmentAcrossPartition;
+        }
+      }
+
+    }
+
+    return segmentsToBeMerged;
+  }
+
+  /**
+   * For calculating the size of the specified segment
+   * @param storeLocation
+   * @param tableIdentifier
+   * @param segId
+   * @return
+   */
+  private static long getSizeOfSegment(String storeLocation,
+      CarbonTableIdentifier tableIdentifier, String segId) {
+    String loadPath = CarbonLoaderUtil
+        .getStoreLocation(storeLocation, tableIdentifier, segId, "0");
+    CarbonFile segmentFolder =
+        FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+    return getSizeOfFactFileInLoad(segmentFolder);
+  }
+
+  /**
+   * Identify the segments to be merged based on the segment count
+   *
+   * @param listOfSegmentsAfterPreserve
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
+      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+
+    List<LoadMetadataDetails> mergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<LoadMetadataDetails> unMergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    int[] noOfSegmentLevelsCount =
+        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+
+    int level1Size = 0;
+    int level2Size = 0;
+    int size = noOfSegmentLevelsCount.length;
+
+    if (size >= 2) {
+      level1Size = noOfSegmentLevelsCount[0];
+      level2Size = noOfSegmentLevelsCount[1];
+    } else if (size == 1) {
+      level1Size = noOfSegmentLevelsCount[0];
+    }
+
+    int unMergeCounter = 0;
+    int mergeCounter = 0;
+
+    // check size of each segment , sum it up across partitions
+    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+      String segName = segment.getLoadName();
+
+      // if a segment is already merged 2 levels then it s name will become .2
+      // need to exclude those segments from minor compaction.
+      // if a segment is major compacted then should not be considered for minor.
+      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
+          segment.isMajorCompacted() != null && segment.isMajorCompacted()
+              .equalsIgnoreCase("true"))) {
+        continue;
+      }
+
+      // check if the segment is merged or not
+
+      if (!isMergedSegment(segName)) {
+        //if it is an unmerged segment then increment counter
+        unMergeCounter++;
+        unMergedSegments.add(segment);
+        if (unMergeCounter == (level1Size)) {
+          return unMergedSegments;
+        }
+      } else {
+        mergeCounter++;
+        mergedSegments.add(segment);
+        if (mergeCounter == (level2Size)) {
+          return mergedSegments;
+        }
+      }
+    }
+    return new ArrayList<>(0);
+  }
+
+  /**
+   * To check if the segment is merged or not.
+   * @param segName
+   * @return
+   */
+  private static boolean isMergedSegment(String segName) {
+    if(segName.contains(".")){
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * checks number of loads to be preserved and returns remaining valid segments
+   *
+   * @param segments
+   * @return
+   */
+  private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
+      List<LoadMetadataDetails> segments) {
+
+    int numberOfSegmentsToBePreserved = 0;
+    // check whether the preserving of the segments from merging is enabled or not.
+    // get the number of loads to be preserved.
+    numberOfSegmentsToBePreserved =
+        CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
+    // get the number of valid segments and retain the latest loads from merging.
+    return CarbonDataMergerUtil
+        .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
+  }
+
+  /**
+   * Retain the number of segments which are to be preserved and return the remaining list of
+   * segments.
+   *
+   * @param loadMetadataDetails
+   * @param numberOfSegToBeRetained
+   * @return
+   */
+  private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
+      List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
+
+    List<LoadMetadataDetails> validList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails segment : loadMetadataDetails) {
+      if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+          || segment.getLoadStatus()
+          .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || segment
+          .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE)) {
+        validList.add(segment);
+      }
+    }
+
+    // check if valid list is big enough for removing the number of seg to be retained.
+    // last element
+    int removingIndex = validList.size() - 1;
+
+    for (int i = validList.size(); i > 0; i--) {
+      if (numberOfSegToBeRetained == 0) {
+        break;
+      }
+      // remove last segment
+      validList.remove(removingIndex--);
+      numberOfSegToBeRetained--;
+    }
+    return validList;
+
+  }
+
+  /**
+   * This will give the compaction sizes configured based on compaction type.
+   *
+   * @param compactionType
+   * @return
+   */
+  public static long getCompactionSize(CompactionType compactionType) {
+
+    long compactionSize = 0;
+    switch (compactionType) {
+      case MAJOR_COMPACTION:
+        compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
+        break;
+      default: // this case can not come.
+    }
+    return compactionSize;
+  }
+
+  /**
+   * For getting the comma separated valid segments for merging.
+   *
+   * @param loadMetadataDetails
+   * @return
+   */
+  public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
+    StringBuilder builder = new StringBuilder();
+    for (LoadMetadataDetails segment : loadMetadataDetails) {
+      //check if this load is an already merged load.
+      if (null != segment.getMergedLoadName()) {
+        builder.append(segment.getMergedLoadName() + ",");
+      } else {
+        builder.append(segment.getLoadName() + ",");
+      }
+    }
+    builder.deleteCharAt(builder.length() - 1);
+    return builder.toString();
+  }
+
+  /**
+   * Combining the list of maps to one map.
+   *
+   * @param mapsOfNodeBlockMapping
+   * @return
+   */
+  public static Map<String, List<TableBlockInfo>> combineNodeBlockMaps(
+      List<Map<String, List<TableBlockInfo>>> mapsOfNodeBlockMapping) {
+
+    Map<String, List<TableBlockInfo>> combinedMap =
+        new HashMap<String, List<TableBlockInfo>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    // traverse list of maps.
+    for (Map<String, List<TableBlockInfo>> eachMap : mapsOfNodeBlockMapping) {
+      // traverse inside each map.
+      for (Map.Entry<String, List<TableBlockInfo>> eachEntry : eachMap.entrySet()) {
+
+        String node = eachEntry.getKey();
+        List<TableBlockInfo> blocks = eachEntry.getValue();
+
+        // if already that node detail exist in the combined map.
+        if (null != combinedMap.get(node)) {
+          List<TableBlockInfo> blocksAlreadyPresent = combinedMap.get(node);
+          blocksAlreadyPresent.addAll(blocks);
+        } else { // if its not present in map then put to map.
+          combinedMap.put(node, blocks);
+        }
+      }
+    }
+    return combinedMap;
+  }
+
+  /**
+   * Removing the already merged segments from list.
+   */
+  public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
+      List<LoadMetadataDetails> segments,
+      LoadMetadataDetails lastSeg) {
+
+    // take complete list of segments.
+    List<LoadMetadataDetails> list = new ArrayList<>(segments);
+    // sort list
+    CarbonDataMergerUtil.sortSegments(list);
+
+    // first filter out newly added segments.
+    return list.subList(0, list.indexOf(lastSeg) + 1);
+
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
new file mode 100644
index 0000000..d1cb9d9
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.util.concurrent.Callable;
+
+import org.apache.carbondata.spark.rdd.Compactor;
+
+import org.apache.spark.sql.execution.command.CompactionCallableModel;
+
+/**
+ * Callable class which is used to trigger the compaction in a separate callable.
+ */
+public class CompactionCallable implements Callable<Void> {
+
+  private final CompactionCallableModel compactionCallableModel;
+
+  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
+
+    this.compactionCallableModel = compactionCallableModel;
+  }
+
+  @Override public Void call() throws Exception {
+
+    Compactor.triggerCompaction(compactionCallableModel);
+    return null;
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
new file mode 100644
index 0000000..d735b44
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+/**
+ * This enum is used to define the types of Compaction.
+ * We have 2 types. one is Minor and another is Major
+ */
+public enum CompactionType {
+    MINOR_COMPACTION,
+    MAJOR_COMPACTION
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
new file mode 100644
index 0000000..93ba2a3
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+
+/**
+ * Block to Node mapping
+ */
+public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
+
+  private final Distributable block;
+  private final String node;
+
+  public NodeBlockRelation(Distributable block, String node) {
+    this.block = block;
+    this.node = node;
+
+  }
+
+  public Distributable getBlock() {
+    return block;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  @Override public int compareTo(NodeBlockRelation obj) {
+    return this.getNode().compareTo(obj.getNode());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof NodeBlockRelation)) {
+      return false;
+    }
+    NodeBlockRelation o = (NodeBlockRelation) obj;
+    return node.equals(o.node);
+  }
+
+  @Override public int hashCode() {
+    return node.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
new file mode 100644
index 0000000..6b4d1bc
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.Distributable;
+
+public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
+
+  private final List<Distributable> blocks;
+  private final String node;
+
+  public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
+    this.node = node;
+    this.blocks = blocks;
+
+  }
+
+  public List<Distributable> getBlocks() {
+    return blocks;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  @Override public int compareTo(NodeMultiBlockRelation obj) {
+    return this.blocks.size() - obj.getBlocks().size();
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof NodeMultiBlockRelation)) {
+      return false;
+    }
+    NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
+    return blocks.equals(o.blocks) && node.equals(o.node);
+  }
+
+  @Override public int hashCode() {
+    return blocks.hashCode() + node.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
new file mode 100644
index 0000000..44c05a1
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
@@ -0,0 +1,336 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.io.File;
+import java.util.AbstractQueue;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.CarbonMetadata;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.carbon.path.CarbonStorePath;
+import org.apache.carbondata.core.carbon.path.CarbonTablePath;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.datatypes.GenericDataType;
+import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This is the Merger class responsible for the merging of the segments.
+ */
+public class RowResultMerger {
+
+  private final String databaseName;
+  private final String tableName;
+  private final String tempStoreLocation;
+  private final int measureCount;
+  private final String factStoreLocation;
+  private CarbonFactHandler dataHandler;
+  private List<RawResultIterator> rawResultIteratorList =
+      new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private SegmentProperties segprop;
+  /**
+   * record holder heap
+   */
+  private AbstractQueue<RawResultIterator> recordHolderHeap;
+
+  private TupleConversionAdapter tupleConvertor;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowResultMerger.class.getName());
+
+  public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
+      String tableName, SegmentProperties segProp, String tempStoreLocation,
+      CarbonLoadModel loadModel, int[] colCardinality) {
+
+    this.rawResultIteratorList = iteratorList;
+    // create the List of RawResultIterator.
+
+    recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
+        new RowResultMerger.CarbonMdkeyComparator());
+
+    this.segprop = segProp;
+    this.tempStoreLocation = tempStoreLocation;
+
+    this.factStoreLocation = loadModel.getStorePath();
+
+    if (!new File(tempStoreLocation).mkdirs()) {
+      LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
+    }
+
+    this.databaseName = databaseName;
+    this.tableName = tableName;
+
+    this.measureCount = segprop.getMeasures().size();
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+            .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
+        getCarbonFactDataHandlerModel(loadModel);
+    carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
+    CarbonDataFileAttributes carbonDataFileAttributes =
+        new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+            loadModel.getFactTimeStamp());
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+    if (segProp.getNumberOfNoDictionaryDimension() > 0
+        || segProp.getComplexDimensions().size() > 0) {
+      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
+    } else {
+      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
+    }
+    carbonFactDataHandlerModel.setColCardinality(colCardinality);
+    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+
+    tupleConvertor = new TupleConversionAdapter(segProp);
+  }
+
+  /**
+   * Merge function
+   *
+   */
+  public boolean mergerSlice() {
+    boolean mergeStatus = false;
+    int index = 0;
+    try {
+
+      dataHandler.initialise();
+
+      // add all iterators to the queue
+      for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
+        this.recordHolderHeap.add(leaftTupleIterator);
+        index++;
+      }
+      RawResultIterator iterator = null;
+      while (index > 1) {
+        // iterator the top record
+        iterator = this.recordHolderHeap.poll();
+        Object[] convertedRow = iterator.next();
+        if(null == convertedRow){
+          throw new SliceMergerException("Unable to generate mdkey during compaction.");
+        }
+        // get the mdkey
+        addRow(convertedRow);
+        // if there is no record in the leaf and all then decrement the
+        // index
+        if (!iterator.hasNext()) {
+          index--;
+          continue;
+        }
+        // add record to heap
+        this.recordHolderHeap.add(iterator);
+      }
+      // if record holder is not empty then iterator the slice holder from
+      // heap
+      iterator = this.recordHolderHeap.poll();
+      while (true) {
+        Object[] convertedRow = iterator.next();
+        if(null == convertedRow){
+          throw new SliceMergerException("Unable to generate mdkey during compaction.");
+        }
+        addRow(convertedRow);
+        // check if leaf contains no record
+        if (!iterator.hasNext()) {
+          break;
+        }
+      }
+      this.dataHandler.finish();
+      mergeStatus = true;
+    } catch (Exception e) {
+      LOGGER.error("Exception in compaction merger " + e.getMessage());
+      mergeStatus = false;
+    } finally {
+      try {
+        this.dataHandler.closeHandler();
+      } catch (CarbonDataWriterException e) {
+        LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
+        mergeStatus = false;
+      }
+    }
+
+    return mergeStatus;
+  }
+
+  /**
+   * Below method will be used to add sorted row
+   *
+   * @throws SliceMergerException
+   */
+  protected void addRow(Object[] carbonTuple) throws SliceMergerException {
+    Object[] rowInWritableFormat;
+
+    rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
+    try {
+      this.dataHandler.addDataToStore(rowInWritableFormat);
+    } catch (CarbonDataWriterException e) {
+      throw new SliceMergerException("Problem in merging the slice", e);
+    }
+  }
+
+  /**
+   * This method will create a model object for carbon fact data handler
+   *
+   * @param loadModel
+   * @return
+   */
+  private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
+    carbonFactDataHandlerModel.setDatabaseName(databaseName);
+    carbonFactDataHandlerModel.setTableName(tableName);
+    carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
+    carbonFactDataHandlerModel.setCompactionFlow(true);
+    carbonFactDataHandlerModel
+        .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
+    carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
+    carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
+    carbonFactDataHandlerModel.setSegmentProperties(segprop);
+    carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
+    carbonFactDataHandlerModel.setDimensionCount(
+        segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    List<ColumnSchema> wrapperColumnSchema = CarbonUtil
+        .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
+            carbonTable.getMeasureByTableName(tableName));
+    carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
+    //TO-DO Need to handle complex types here .
+    Map<Integer, GenericDataType> complexIndexMap =
+        new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
+    carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
+    carbonFactDataHandlerModel.setDataWritingRequest(true);
+
+    char[] aggType = new char[segprop.getMeasures().size()];
+    Arrays.fill(aggType, 'n');
+    int i = 0;
+    for (CarbonMeasure msr : segprop.getMeasures()) {
+      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
+    }
+    carbonFactDataHandlerModel.setAggType(aggType);
+    carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
+
+    String carbonDataDirectoryPath =
+        checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
+            loadModel.getPartitionId(), loadModel.getSegmentId());
+    carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
+
+    List<CarbonDimension> dimensionByTableName =
+        loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
+    boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
+    int index = 0;
+    for (CarbonDimension dimension : dimensionByTableName) {
+      isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
+    }
+    carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
+    return carbonFactDataHandlerModel;
+  }
+
+  /**
+   * This method will get the store location for the given path, segment id and partition id
+   *
+   * @return data directory path
+   */
+  private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
+      String tableName, String partitionId, String segmentId) {
+    String carbonStorePath = factStoreLocation;
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(carbonStorePath, carbonTableIdentifier);
+    String carbonDataDirectoryPath =
+        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
+    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
+    return carbonDataDirectoryPath;
+  }
+
+  /**
+   * Comparator class for comparing 2 raw row result.
+   */
+  private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
+
+    @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
+
+      Object[] row1 = new Object[0];
+      Object[] row2 = new Object[0];
+      try {
+        row1 = o1.fetchConverted();
+        row2 = o2.fetchConverted();
+      } catch (KeyGenException e) {
+        LOGGER.error(e.getMessage());
+      }
+      if (null == row1 || null == row2) {
+        return 0;
+      }
+      ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
+      ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
+      int compareResult = 0;
+      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
+      int dictionaryKeyOffset = 0;
+      byte[] dimCols1 = key1.getDictionaryKey();
+      byte[] dimCols2 = key2.getDictionaryKey();
+      int noDicIndex = 0;
+      for (int eachColumnValueSize : columnValueSizes) {
+        // case of dictionary cols
+        if (eachColumnValueSize > 0) {
+
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
+                  dictionaryKeyOffset, eachColumnValueSize);
+          dictionaryKeyOffset += eachColumnValueSize;
+        } else { // case of no dictionary
+
+          byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
+          byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
+          compareResult =
+              ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
+          noDicIndex++;
+
+        }
+        if (0 != compareResult) {
+          return compareResult;
+        }
+      }
+      return 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
new file mode 100644
index 0000000..68cb6e9
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.merger;
+
+import java.io.Serializable;
+
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.schema.table.CarbonTable;
+
+public class TableMeta implements Serializable {
+
+  private static final long serialVersionUID = -1749874611119829431L;
+
+  public CarbonTableIdentifier carbonTableIdentifier;
+  public String storePath;
+  public CarbonTable carbonTable;
+
+  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
+      CarbonTable carbonTable) {
+    this.carbonTableIdentifier = carbonTableIdentifier;
+    this.storePath = storePath;
+    this.carbonTable = carbonTable;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
new file mode 100644
index 0000000..94ebf40
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.merger;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.core.carbon.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.processing.util.RemoveDictionaryUtil;
+import org.apache.carbondata.scan.wrappers.ByteArrayWrapper;
+
+/**
+ * This class will be used to convert the Result into the format used in data writer.
+ */
+public class TupleConversionAdapter {
+
+  private final SegmentProperties segmentproperties;
+
+  private final List<CarbonMeasure> measureList;
+
+  private int noDictionaryPresentIndex;
+
+  private int measureCount;
+
+  private boolean isNoDictionaryPresent;
+
+  public TupleConversionAdapter(SegmentProperties segmentProperties) {
+    this.measureCount = segmentProperties.getMeasures().size();
+    this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
+    if (isNoDictionaryPresent) {
+      noDictionaryPresentIndex++;
+    }
+    this.segmentproperties = segmentProperties;
+    measureList = segmentProperties.getMeasures();
+  }
+
+  /**
+   * Converting the raw result to the format understandable by the data writer.
+   * @param carbonTuple
+   * @return
+   */
+  public Object[] getObjectArray(Object[] carbonTuple) {
+    Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
+    int index = 0;
+    // put measures.
+
+    for (int j = 1; j <= measureCount; j++) {
+      row[index++] = carbonTuple[j];
+    }
+
+    // put No dictionary byte []
+    if (isNoDictionaryPresent) {
+
+      int noDicCount = segmentproperties.getNumberOfNoDictionaryDimension();
+      List<byte[]> noDicByteArr = new ArrayList<>(noDicCount);
+      for (int i = 0; i < noDicCount; i++) {
+        noDicByteArr.add(((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeyByIndex(i));
+      }
+      byte[] singleByteArr = RemoveDictionaryUtil.convertListByteArrToSingleArr(noDicByteArr);
+
+      row[index++] = singleByteArr;
+    }
+
+    // put No Dictionary Dims
+    row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
+    return row;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
new file mode 100644
index 0000000..58f3a2d
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/DataPartitioner.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api;
+
+import java.util.List;
+
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+
+import org.apache.spark.sql.execution.command.Partitioner;
+
+public interface DataPartitioner {
+  /**
+   * Initialise the partitioner based on the given columns
+   */
+  void initialize(String basePath, String[] columns, Partitioner partitioner);
+
+  /**
+   * All the partitions built by the Partitioner
+   */
+  List<Partition> getAllPartitions();
+
+  /**
+   * Partition where the tuple should be present. (API used for data loading purpose)
+   */
+  Partition getPartionForTuple(Object[] tuple, long rowCounter);
+
+  /**
+   * Identifies the partitions applicable for the given filter (API used for For query)
+   */
+  List<Partition> getPartitions(CarbonQueryPlan queryPlan);
+
+  String[] getPartitionedColumns();
+
+  Partitioner getPartitioner();
+
+}
+

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
new file mode 100644
index 0000000..61639d3
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/Partition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface Partition extends Serializable {
+  /**
+   * unique identification for the partition in the cluster.
+   */
+  String getUniqueID();
+
+  /**
+   * File path for the raw data represented by this partition
+   */
+  String getFilePath();
+
+  /**
+   * result
+   *
+   * @return
+   */
+  List<String> getFilesPath();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
new file mode 100644
index 0000000..bc6e54f
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DataPartitionerProperties.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+public final class DataPartitionerProperties {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(DataPartitionerProperties.class.getName());
+
+  private static DataPartitionerProperties instance;
+
+  private Properties properties;
+
+  private DataPartitionerProperties() {
+    properties = loadProperties();
+  }
+
+  public static DataPartitionerProperties getInstance() {
+    if (instance == null) {
+      instance = new DataPartitionerProperties();
+    }
+    return instance;
+  }
+
+  public String getValue(String key, String defaultVal) {
+    return properties.getProperty(key, defaultVal);
+  }
+
+  public String getValue(String key) {
+    return properties.getProperty(key);
+  }
+
+  /**
+   * Read the properties from CSVFilePartitioner.properties
+   */
+  private Properties loadProperties() {
+    Properties props = new Properties();
+
+    File file = new File("DataPartitioner.properties");
+    FileInputStream fis = null;
+    try {
+      if (file.exists()) {
+        fis = new FileInputStream(file);
+
+        props.load(fis);
+      }
+    } catch (Exception e) {
+      LOGGER
+          .error(e, e.getMessage());
+    } finally {
+      if (null != fis) {
+        try {
+          fis.close();
+        } catch (IOException e) {
+          LOGGER.error(e,
+              e.getMessage());
+        }
+      }
+    }
+
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
new file mode 100644
index 0000000..9bee8a2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/DefaultLoadBalancer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+/**
+ * A sample load balancer to distribute the partitions to the available nodes in a round robin mode.
+ */
+public class DefaultLoadBalancer {
+  private Map<String, List<Partition>> nodeToPartitonMap =
+      new HashMap<String, List<Partition>>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private Map<Partition, String> partitonToNodeMap =
+      new HashMap<Partition, String>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  public DefaultLoadBalancer(List<String> nodes, List<Partition> partitions) {
+    //Per form a round robin allocation
+    int nodeCount = nodes.size();
+
+    int partitioner = 0;
+    for (Partition partition : partitions) {
+      int nodeindex = partitioner % nodeCount;
+      String node = nodes.get(nodeindex);
+
+      List<Partition> oldList = nodeToPartitonMap.get(node);
+      if (oldList == null) {
+        oldList = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+        nodeToPartitonMap.put(node, oldList);
+      }
+      oldList.add(partition);
+
+      partitonToNodeMap.put(partition, node);
+
+      partitioner++;
+    }
+  }
+
+  public List<Partition> getPartitionsForNode(String node) {
+    return nodeToPartitonMap.get(node);
+  }
+
+  public String getNodeForPartitions(Partition partition) {
+    return partitonToNodeMap.get(partition);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
new file mode 100644
index 0000000..bd7cc42
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionImpl.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.spark.partition.api.Partition;
+
+public class PartitionImpl implements Partition {
+  private static final long serialVersionUID = 3020172346383028547L;
+  private String uniqueID;
+  private String folderPath;
+
+
+  public PartitionImpl(String uniqueID, String folderPath) {
+    this.uniqueID = uniqueID;
+    this.folderPath = folderPath;
+  }
+
+  @Override public String getUniqueID() {
+    return uniqueID;
+  }
+
+  @Override public String getFilePath() {
+    return folderPath;
+  }
+
+  @Override public String toString() {
+    return "{PartitionID -> " + uniqueID + " Path: " + folderPath + '}';
+  }
+
+  @Override public List<String> getFilesPath() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
new file mode 100644
index 0000000..de32b5c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/PartitionMultiFileImpl.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.List;
+
+import org.apache.carbondata.spark.partition.api.Partition;
+
+public class PartitionMultiFileImpl implements Partition {
+  private static final long serialVersionUID = -4363447826181193976L;
+  private String uniqueID;
+  private List<String> folderPath;
+
+  public PartitionMultiFileImpl(String uniqueID, List<String> folderPath) {
+    this.uniqueID = uniqueID;
+    this.folderPath = folderPath;
+  }
+
+  @Override public String getUniqueID() {
+    // TODO Auto-generated method stub
+    return uniqueID;
+  }
+
+  @Override public String getFilePath() {
+    // TODO Auto-generated method stub
+    return null;
+  }
+
+  @Override public List<String> getFilesPath() {
+    // TODO Auto-generated method stub
+    return folderPath;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
new file mode 100644
index 0000000..e05be7d
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/QueryPartitionHelper.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.DataPartitioner;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+
+public final class QueryPartitionHelper {
+  private static QueryPartitionHelper instance = new QueryPartitionHelper();
+  private Map<String, DataPartitioner> partitionerMap =
+      new HashMap<String, DataPartitioner>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+  private Map<String, DefaultLoadBalancer> loadBalancerMap =
+      new HashMap<String, DefaultLoadBalancer>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private QueryPartitionHelper() {
+
+  }
+
+  public static QueryPartitionHelper getInstance() {
+    return instance;
+  }
+
+  /**
+   * Get partitions applicable for query based on filters applied in query
+   */
+  public List<Partition> getPartitionsForQuery(CarbonQueryPlan queryPlan) {
+    String tableUniqueName = queryPlan.getDatabaseName() + '_' + queryPlan.getTableName();
+
+    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+    List<Partition> queryPartitions = dataPartitioner.getPartitions(queryPlan);
+    return queryPartitions;
+  }
+
+  public List<Partition> getAllPartitions(String databaseName, String tableName) {
+    String tableUniqueName = databaseName + '_' + tableName;
+
+    DataPartitioner dataPartitioner = partitionerMap.get(tableUniqueName);
+
+    return dataPartitioner.getAllPartitions();
+  }
+
+  /**
+   * Get the node name where the partition is assigned to.
+   */
+  public String getLocation(Partition partition, String databaseName, String tableName) {
+    String tableUniqueName = databaseName + '_' + tableName;
+
+    DefaultLoadBalancer loadBalancer = loadBalancerMap.get(tableUniqueName);
+    return loadBalancer.getNodeForPartitions(partition);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
new file mode 100644
index 0000000..c9b434a
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/partition/api/impl/SampleDataPartitionerImpl.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.spark.partition.api.impl;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.spark.partition.api.DataPartitioner;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+import org.apache.spark.sql.execution.command.Partitioner;
+
+/**
+ * Sample partition.
+ */
+public class SampleDataPartitionerImpl implements DataPartitioner {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(SampleDataPartitionerImpl.class.getName());
+  private int numberOfPartitions = 1;
+
+  private int partionColumnIndex = -1;
+
+  private String partitionColumn;
+
+  private Partitioner partitioner;
+  private List<Partition> allPartitions;
+  private String baseLocation;
+
+  public SampleDataPartitionerImpl() {
+  }
+
+  public void initialize(String basePath, String[] columns, Partitioner partitioner) {
+    this.partitioner = partitioner;
+    numberOfPartitions = partitioner.partitionCount();
+
+    partitionColumn = partitioner.partitionColumn()[0];
+    LOGGER.info("SampleDataPartitionerImpl initializing with following properties.");
+    LOGGER.info("partitionCount: " + numberOfPartitions);
+    LOGGER.info("partitionColumn: " + partitionColumn);
+    LOGGER.info("basePath: " + basePath);
+    LOGGER.info("columns: " + Arrays.toString(columns));
+
+    this.baseLocation = basePath;
+    allPartitions = new ArrayList<Partition>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+    for (int i = 0; i < columns.length; i++) {
+      if (columns[i].equalsIgnoreCase(partitionColumn)) {
+        partionColumnIndex = i;
+        break;
+      }
+    }
+
+    for (int partionCounter = 0; partionCounter < numberOfPartitions; partionCounter++) {
+      PartitionImpl partitionImpl =
+          new PartitionImpl("" + partionCounter, baseLocation + '/' + partionCounter);
+
+      List<Object> includedHashes = new ArrayList<Object>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+      includedHashes.add(partionCounter);
+
+      allPartitions.add(partitionImpl);
+    }
+  }
+
+  @Override public Partition getPartionForTuple(Object[] tuple, long rowCounter) {
+    int hashCode;
+    if (partionColumnIndex == -1) {
+      hashCode = hashCode(rowCounter);
+    } else {
+      try {
+        hashCode = hashCode(((String) tuple[partionColumnIndex]).hashCode());
+      } catch (NumberFormatException e) {
+        hashCode = hashCode(0);
+      }
+    }
+    return allPartitions.get(hashCode);
+  }
+
+  /**
+   *
+   */
+  public List<Partition> getAllPartitions() {
+    return allPartitions;
+  }
+
+  /**
+   * @see DataPartitioner#getPartitions(CarbonQueryPlan)
+   */
+  public List<Partition> getPartitions(CarbonQueryPlan queryPlan) {
+    // TODO: this has to be redone during partitioning implmentatation
+    return allPartitions;
+  }
+
+  /**
+   * Identify the partitions applicable for the given filter
+   */
+  public List<Partition> getPartitions() {
+    return allPartitions;
+
+    // TODO: this has to be redone during partitioning implementation
+    //    for (Partition aPartition : allPartitions) {
+    //      CarbonDimensionLevelFilter partitionFilterDetails =
+    //          aPartition.getPartitionDetails().get(partitionColumn);
+    //
+    //      //Check if the partition is serving any of the
+    //      //hash code generated for include filter of query
+    //      for (Object includeFilter : msisdnFilter.getIncludeFilter()) {
+    //        int hashCode = hashCode(((String) includeFilter).hashCode());
+    //        if (partitionFilterDetails.getIncludeFilter().contains(hashCode)) {
+    //          allowedPartitions.add(aPartition);
+    //          break;
+    //        }
+    //      }
+    //    }
+
+  }
+
+  private int hashCode(long key) {
+    return (int) (Math.abs(key) % numberOfPartitions);
+  }
+
+  @Override public String[] getPartitionedColumns() {
+    return new String[] { partitionColumn };
+  }
+
+  @Override public Partitioner getPartitioner() {
+    return partitioner;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
new file mode 100644
index 0000000..bb8fc5c
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/readsupport/SparkRowReadSupportImpl.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.readsupport;
+
+import java.sql.Timestamp;
+
+import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.metadata.datatype.DataType;
+import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
+import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.hadoop.readsupport.impl.AbstractDictionaryDecodedReadSupport;
+
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.expressions.GenericRow;
+import org.apache.spark.unsafe.types.UTF8String;
+
+public class SparkRowReadSupportImpl extends AbstractDictionaryDecodedReadSupport<Row> {
+
+  @Override public void initialize(CarbonColumn[] carbonColumns,
+      AbsoluteTableIdentifier absoluteTableIdentifier) {
+    super.initialize(carbonColumns, absoluteTableIdentifier);
+    //can initialize and generate schema here.
+  }
+
+  @Override public Row readRow(Object[] data) {
+    for (int i = 0; i < dictionaries.length; i++) {
+      if (dictionaries[i] != null) {
+        data[i] = DataTypeUtil
+            .getDataBasedOnDataType(dictionaries[i].getDictionaryValueForKey((int) data[i]),
+                dataTypes[i]);
+        switch (dataTypes[i]) {
+          case STRING:
+            data[i] = UTF8String.fromString(data[i].toString());
+            break;
+          case TIMESTAMP:
+            data[i] = new Timestamp((long) data[i] / 1000);
+            break;
+          case LONG:
+            data[i] = data[i];
+            break;
+          default:
+        }
+      } else if (carbonColumns[i].hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+        //convert the long to timestamp in case of direct dictionary column
+        if (DataType.TIMESTAMP == carbonColumns[i].getDataType()) {
+          data[i] = new Timestamp((long) data[i] / 1000);
+        }
+      }
+    }
+    return new GenericRow(data);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/66ccd308/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
new file mode 100644
index 0000000..3fb24e2
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/splits/TableSplit.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.carbondata.spark.splits;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.spark.partition.api.Partition;
+
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * It represents one region server as one split.
+ */
+public class TableSplit implements Serializable, Writable {
+  private static final long serialVersionUID = -8058151330863145575L;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(TableSplit.class.getName());
+  private List<String> locations = new ArrayList<String>(CarbonCommonConstants.CONSTANT_SIZE_TEN);
+
+  private Partition partition;
+
+  /**
+   * @return the locations
+   */
+  public List<String> getLocations() {
+    return locations;
+  }
+
+  /**
+   * @param locations the locations to set
+   */
+  public void setLocations(List<String> locations) {
+    this.locations = locations;
+  }
+
+  /**
+   * @return Returns the partitions.
+   */
+  public Partition getPartition() {
+    return partition;
+  }
+
+  /**
+   * @param partition The partitions to set.
+   */
+  public void setPartition(Partition partition) {
+    this.partition = partition;
+  }
+
+  @Override public void readFields(DataInput in) throws IOException {
+
+    int sizeLoc = in.readInt();
+    for (int i = 0; i < sizeLoc; i++) {
+      byte[] b = new byte[in.readInt()];
+      in.readFully(b);
+      locations.add(new String(b, Charset.defaultCharset()));
+    }
+
+    byte[] buf = new byte[in.readInt()];
+    in.readFully(buf);
+    ByteArrayInputStream bis = new ByteArrayInputStream(buf);
+    ObjectInputStream ois = new ObjectInputStream(bis);
+    try {
+      partition = (Partition) ois.readObject();
+    } catch (ClassNotFoundException e) {
+      LOGGER.error(e, e.getMessage());
+    }
+    ois.close();
+  }
+
+  @Override public void write(DataOutput out) throws IOException {
+
+    int sizeLoc = locations.size();
+    out.writeInt(sizeLoc);
+    for (int i = 0; i < sizeLoc; i++) {
+      byte[] bytes = locations.get(i).getBytes(Charset.defaultCharset());
+      out.writeInt(bytes.length);
+      out.write(bytes);
+    }
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+
+    ObjectOutputStream obs = new ObjectOutputStream(bos);
+    obs.writeObject(partition);
+    obs.close();
+    byte[] byteArray = bos.toByteArray();
+    out.writeInt(byteArray.length);
+    out.write(byteArray);
+  }
+
+  public String toString() {
+    return partition.getUniqueID() + ' ' + locations;
+  }
+}