You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/04/07 09:55:28 UTC

[25/49] incubator-carbondata git commit: Compaction lock should also be acquired during alter operation as alter and compaction on same table should not be allowed concurrently.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
new file mode 100644
index 0000000..ee667c2
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -0,0 +1,1385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger;
+
+import java.io.File;
+import java.io.IOException;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.locks.CarbonLockFactory;
+import org.apache.carbondata.core.locks.ICarbonLock;
+import org.apache.carbondata.core.locks.LockUsage;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
+import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+
+/**
+ * utility class for load merging.
+ */
+public final class CarbonDataMergerUtil {
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
+
+  private CarbonDataMergerUtil() {
+
+  }
+
+  /**
+   * Returns the size of all the carbondata files present in the segment.
+   * @param carbonFile
+   * @return
+   */
+  private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
+    long factSize = 0;
+
+    // carbon data file case.
+    CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
+
+      @Override public boolean accept(CarbonFile file) {
+        return CarbonTablePath.isCarbonDataFile(file.getName());
+      }
+    });
+
+    for (CarbonFile fact : factFile) {
+      factSize += fact.getSize();
+    }
+
+    return factSize;
+  }
+
+  /**
+   * To check whether the merge property is enabled or not.
+   *
+   * @return
+   */
+
+  public static boolean checkIfAutoLoadMergingRequired() {
+    // load merge is not supported as per new store format
+    // moving the load merge check in early to avoid unnecessary load listing causing IOException
+    // check whether carbons segment merging operation is enabled or not.
+    // default will be false.
+    String isLoadMergeEnabled = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+            CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
+    if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Form the Name of the New Merge Folder
+   *
+   * @param segmentsToBeMergedList
+   * @return
+   */
+  public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
+    String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
+    if (firstSegmentName.contains(".")) {
+      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
+      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
+      int fraction = Integer.parseInt(afterDecimal) + 1;
+      String mergedSegmentName = beforeDecimal + "." + fraction;
+      return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
+    } else {
+      String mergeName = firstSegmentName + "." + 1;
+      return CarbonCommonConstants.LOAD_FOLDER + mergeName;
+    }
+
+  }
+
+
+  /**
+   * Form the Name of the New Merge Folder
+   *
+   * @param segmentToBeMerged
+   * @return
+   */
+  public static String getMergedLoadName(final String segmentToBeMerged) {
+    String firstSegmentName = segmentToBeMerged;
+    if (firstSegmentName.contains(".")) {
+      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
+      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
+      int fraction = Integer.parseInt(afterDecimal) + 1;
+      String mergedSegmentName = beforeDecimal + "." + fraction;
+      return mergedSegmentName;
+    } else {
+      String mergeName = firstSegmentName + "." + 1;
+      return mergeName;
+    }
+
+  }
+
+  /**
+   * Update Both Segment Update Status and Table Status for the case of IUD Delete
+   * delta compaction.
+   *
+   * @param loadsToMerge
+   * @param metaDataFilepath
+   * @param carbonLoadModel
+   * @return
+   */
+  public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
+      List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
+      CarbonLoadModel carbonLoadModel) {
+
+    boolean status = false;
+    boolean updateLockStatus = false;
+    boolean tableLockStatus = false;
+
+    String timestamp = carbonLoadModel.getFactTimeStamp();
+
+    List<String> updatedDeltaFilesList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update
+    // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with
+    // Table Status Metadata file (For Update Block Compaction) it has to update the
+    // Table Update Status Metadata File (For corresponding Delete Delta File).
+    // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in
+    // A) Table Update Status Metadata File (Block Level)
+    //      * For each blocks which is being compacted Mark 'Compacted' as the Status.
+    // B) Table Status Metadata file (Segment Level)
+    //      * loadStatus won't be changed to "compacted'
+    //      * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current
+    //        timestamp (which is being passed from driver)
+    // First the Table Update Status Metadata File should be updated as we need to get
+    // the updated blocks for the segment from Table Status Metadata Update Delta Start and
+    // End Timestamp.
+
+    // Table Update Status Metadata Update.
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
+    ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
+
+    // Update the Compacted Blocks with Compacted Status.
+    try {
+      updatedDeltaFilesList = segmentUpdateStatusManager
+          .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName().toString());
+    } catch (Exception e) {
+      LOGGER.error("Error while getting the Update Delta Blocks.");
+      status = false;
+      return status;
+    }
+
+    if (updatedDeltaFilesList.size() > 0) {
+      try {
+        updateLockStatus = updateLock.lockWithRetries();
+        tableLockStatus = statusLock.lockWithRetries();
+
+        List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size());
+
+        for (String compactedBlocks : updatedDeltaFilesList) {
+          // Try to BlockName
+          String fullBlock = compactedBlocks;
+          int endIndex = fullBlock.lastIndexOf(File.separator);
+          String blkNoExt = fullBlock.substring(endIndex + 1, fullBlock.lastIndexOf("-"));
+          blockNames.add(blkNoExt);
+        }
+
+        if (updateLockStatus && tableLockStatus) {
+
+          SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager
+              .readLoadMetadata();
+
+          for (String compactedBlocks : blockNames) {
+            // Check is the compactedBlocks name matches with oldDetails
+            for (int i = 0; i < updateLists.length; i++) {
+              if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks)
+                  && !CarbonCommonConstants.COMPACTED.equalsIgnoreCase(updateLists[i].getStatus())
+                  && !CarbonCommonConstants.MARKED_FOR_DELETE
+                  .equalsIgnoreCase(updateLists[i].getStatus())) {
+                updateLists[i].setStatus(CarbonCommonConstants.COMPACTED);
+              }
+            }
+          }
+
+          LoadMetadataDetails[] loadDetails =
+              segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+          for (LoadMetadataDetails loadDetail : loadDetails) {
+            if (loadsToMerge.contains(loadDetail)) {
+              loadDetail.setUpdateDeltaStartTimestamp(timestamp);
+              loadDetail.setUpdateDeltaEndTimestamp(timestamp);
+              if (loadDetail.getLoadName().equalsIgnoreCase("0")) {
+                loadDetail
+                    .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+              }
+            }
+          }
+
+          try {
+            segmentUpdateStatusManager
+                .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
+            segmentStatusManager
+                .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
+            status = true;
+          } catch (IOException e) {
+            LOGGER.error(
+                "Error while writing metadata. The metadata file path is " + carbonTablePath
+                    .getMetadataDirectoryPath());
+            status = false;
+          }
+        } else {
+          LOGGER.error("Not able to acquire the lock.");
+          status = false;
+        }
+      } catch (Exception e) {
+        LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
+            .getMetadataDirectoryPath());
+        status = false;
+
+      } finally {
+        if (updateLockStatus) {
+          if (updateLock.unlock()) {
+            LOGGER.info("Unlock the segment update lock successfully.");
+          } else {
+            LOGGER.error("Not able to unlock the segment update lock.");
+          }
+        }
+        if (tableLockStatus) {
+          if (statusLock.unlock()) {
+            LOGGER.info("Unlock the table status lock successfully.");
+          } else {
+            LOGGER.error("Not able to unlock the table status lock.");
+          }
+        }
+      }
+    }
+    return status;
+  }
+
+  /**
+   * method to update table status in case of IUD Update Delta Compaction.
+   * @param loadsToMerge
+   * @param metaDataFilepath
+   * @param MergedLoadName
+   * @param carbonLoadModel
+   * @param compactionType
+   * @return
+   */
+  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
+      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
+      long mergeLoadStartTime, CompactionType compactionType) {
+
+    boolean tableStatusUpdationStatus = false;
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+
+    try {
+      if (carbonLock.lockWithRetries()) {
+        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
+            + carbonLoadModel.getTableName() + " for table status updation ");
+
+        CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        String statusFilePath = carbonTablePath.getTableStatusFilePath();
+
+        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        String mergedLoadNumber = MergedLoadName.substring(
+            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
+                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
+
+        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
+        for (LoadMetadataDetails loadDetail : loadDetails) {
+          // check if this segment is merged.
+          if (loadsToMerge.contains(loadDetail)) {
+            // if the compacted load is deleted after the start of the compaction process,
+            // then need to discard the compaction process and treat it as failed compaction.
+            if (loadDetail.getLoadStatus()
+                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
+              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
+                  + " is deleted after the compaction is started.");
+              return false;
+            }
+            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
+            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
+            loadDetail.setMergedLoadName(mergedLoadNumber);
+          }
+        }
+
+        // create entry for merged one.
+        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
+        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
+        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
+        loadMetadataDetails.setLoadEndTime(loadEnddate);
+        loadMetadataDetails.setLoadName(mergedLoadNumber);
+        loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
+        loadMetadataDetails.setPartitionCount("0");
+        // if this is a major compaction then set the segment as major compaction.
+        if (compactionType == CompactionType.MAJOR_COMPACTION) {
+          loadMetadataDetails.setMajorCompacted("true");
+        }
+
+        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
+
+        // put the merged folder entry
+        updatedDetailsList.add(loadMetadataDetails);
+
+        try {
+          SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
+              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
+          tableStatusUpdationStatus = true;
+        } catch (IOException e) {
+          LOGGER.error("Error while writing metadata");
+        }
+      } else {
+        LOGGER.error(
+            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + "for table status updation");
+      }
+    } finally {
+      if (carbonLock.unlock()) {
+        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
+            .getDatabaseName() + "." + carbonLoadModel.getTableName());
+      } else {
+        LOGGER.error(
+            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
+                + carbonLoadModel.getTableName() + " during table status updation");
+      }
+    }
+    return tableStatusUpdationStatus;
+  }
+
+  /**
+   * To identify which all segments can be merged.
+   *
+   * @param storeLocation
+   * @param carbonLoadModel
+   * @param compactionSize
+   * @return
+   */
+  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
+      CarbonLoadModel carbonLoadModel, long compactionSize,
+      List<LoadMetadataDetails> segments, CompactionType compactionType) {
+
+    List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
+
+    sortSegments(sortedSegments);
+
+    // Check for segments which are qualified for IUD compaction.
+    if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+
+      List<LoadMetadataDetails> listOfSegmentsToBeMerged =
+          identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
+
+      return listOfSegmentsToBeMerged;
+    }
+
+    // check preserve property and preserve the configured number of latest loads.
+
+    List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
+        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
+
+    // filter the segments if the compaction based on days is configured.
+
+    List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
+        identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
+    List<LoadMetadataDetails> listOfSegmentsToBeMerged;
+    // identify the segments to merge based on the Size of the segments across partition.
+    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
+
+      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
+          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
+    } else {
+
+      listOfSegmentsToBeMerged =
+          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+    }
+
+    return listOfSegmentsToBeMerged;
+  }
+
+  /**
+   * Sorting of the segments.
+   * @param segments
+   */
+  public static void sortSegments(List segments) {
+    // sort the segment details.
+    Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
+      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+        double seg1Id = Double.parseDouble(seg1.getLoadName());
+        double seg2Id = Double.parseDouble(seg2.getLoadName());
+        if (seg1Id - seg2Id < 0) {
+          return -1;
+        }
+        if (seg1Id - seg2Id > 0) {
+          return 1;
+        }
+        return 0;
+      }
+    });
+  }
+
+  /**
+   * This method will return the list of loads which are loaded at the same interval.
+   * This property is configurable.
+   *
+   * @param listOfSegmentsBelowThresholdSize
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
+      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
+
+    List<LoadMetadataDetails> loadsOfSameDate =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    long numberOfDaysAllowedToMerge = 0;
+    try {
+      numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+              CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
+      if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
+        LOGGER.error(
+            "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
+                + " is incorrect."
+                + " Correct value should be in range of 0 -100. Taking the default value.");
+        numberOfDaysAllowedToMerge =
+            Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+      }
+
+    } catch (NumberFormatException e) {
+      numberOfDaysAllowedToMerge =
+          Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
+    }
+    // if true then process loads according to the load date.
+    if (numberOfDaysAllowedToMerge > 0) {
+
+      // filter loads based on the loaded date
+      boolean first = true;
+      Date segDate1 = null;
+      SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
+      for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
+
+        if (first) {
+          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+          first = false;
+          continue;
+        }
+        long segmentDate = segment.getLoadStartTime();
+        Date segDate2 = null;
+        try {
+          segDate2 = sdf.parse(sdf.format(segmentDate));
+        } catch (ParseException e) {
+          LOGGER.error("Error while parsing segment start time" + e.getMessage());
+        }
+
+        if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
+          loadsOfSameDate.add(segment);
+        }
+        // if the load is beyond merged date.
+        // then reset everything and continue search for loads.
+        else if (loadsOfSameDate.size() < 2) {
+          loadsOfSameDate.clear();
+          // need to add the next segment as first and  to check further
+          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
+        } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
+          break;
+        }
+      }
+    } else {
+      return listOfSegmentsBelowThresholdSize;
+    }
+
+    return loadsOfSameDate;
+  }
+
+  /**
+   * @param loadsOfSameDate
+   * @param segment
+   * @return
+   */
+  private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
+      LoadMetadataDetails segment, SimpleDateFormat sdf) {
+    long baselineLoadStartTime = segment.getLoadStartTime();
+    Date segDate1 = null;
+    try {
+      segDate1 = sdf.parse(sdf.format(baselineLoadStartTime));
+    } catch (ParseException e) {
+      LOGGER.error("Error while parsing segment start time" + e.getMessage());
+    }
+    loadsOfSameDate.add(segment);
+    return segDate1;
+  }
+
+  /**
+   * Method to check if the load dates are complied to the configured dates.
+   *
+   * @param segDate1
+   * @param segDate2
+   * @return
+   */
+  private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
+      long numberOfDaysAllowedToMerge) {
+    if (segDate1 == null || segDate2 == null) {
+      return false;
+    }
+    // take 1 st date add the configured days .
+    Calendar cal1 = Calendar.getInstance();
+    cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
+    Calendar cal2 = Calendar.getInstance();
+    cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
+
+    long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
+
+    if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Identify the segments to be merged based on the Size in case of Major compaction.
+   *
+   * @param compactionSize
+   * @param listOfSegmentsAfterPreserve
+   * @param carbonLoadModel
+   * @param storeLocation
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
+      long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
+      CarbonLoadModel carbonLoadModel, String storeLocation) {
+
+    List<LoadMetadataDetails> segmentsToBeMerged =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    CarbonTableIdentifier tableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
+
+
+    // total length
+    long totalLength = 0;
+
+    // check size of each segment , sum it up across partitions
+    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+      String segId = segment.getLoadName();
+      // variable to store one  segment size across partition.
+      long sizeOfOneSegmentAcrossPartition =
+          getSizeOfSegment(storeLocation, tableIdentifier, segId);
+
+      // if size of a segment is greater than the Major compaction size. then ignore it.
+      if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
+        // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
+          break;
+        } else { // if only one segment is found then remove the earlier one in list.
+          // reset the total length to 0.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          totalLength = 0;
+          continue;
+        }
+      }
+
+      totalLength += sizeOfOneSegmentAcrossPartition;
+
+      // in case of major compaction the size doesnt matter. all the segments will be merged.
+      if (totalLength < (compactionSize * 1024 * 1024)) {
+        segmentsToBeMerged.add(segment);
+      } else { // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
+          break;
+        } else { // if only one segment is found then remove the earlier one in list and put this.
+          // reset the total length to the current identified segment.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          segmentsToBeMerged.add(segment);
+          totalLength = sizeOfOneSegmentAcrossPartition;
+        }
+      }
+
+    }
+
+    return segmentsToBeMerged;
+  }
+
+  /**
+   * For calculating the size of the specified segment
+   * @param storeLocation
+   * @param tableIdentifier
+   * @param segId
+   * @return
+   */
+  private static long getSizeOfSegment(String storeLocation,
+      CarbonTableIdentifier tableIdentifier, String segId) {
+    String loadPath = getStoreLocation(storeLocation, tableIdentifier, segId);
+    CarbonFile segmentFolder =
+        FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+    return getSizeOfFactFileInLoad(segmentFolder);
+  }
+
+  /**
+   * This method will get the store location for the given path, segemnt id and partition id
+   *
+   * @param storePath
+   * @param carbonTableIdentifier
+   * @param segmentId
+   * @return
+   */
+  private static String getStoreLocation(String storePath,
+      CarbonTableIdentifier carbonTableIdentifier, String segmentId) {
+    CarbonTablePath carbonTablePath =
+        CarbonStorePath.getCarbonTablePath(storePath, carbonTableIdentifier);
+    return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId);
+  }
+
+
+  /**
+   * Identify the segments to be merged based on the segment count
+   *
+   * @param listOfSegmentsAfterPreserve
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
+      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+
+    List<LoadMetadataDetails> mergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<LoadMetadataDetails> unMergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    int[] noOfSegmentLevelsCount =
+        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+
+    int level1Size = 0;
+    int level2Size = 0;
+    int size = noOfSegmentLevelsCount.length;
+
+    if (size >= 2) {
+      level1Size = noOfSegmentLevelsCount[0];
+      level2Size = noOfSegmentLevelsCount[1];
+    } else if (size == 1) {
+      level1Size = noOfSegmentLevelsCount[0];
+    }
+
+    int unMergeCounter = 0;
+    int mergeCounter = 0;
+
+    // check size of each segment , sum it up across partitions
+    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+      String segName = segment.getLoadName();
+
+      // if a segment is already merged 2 levels then it s name will become .2
+      // need to exclude those segments from minor compaction.
+      // if a segment is major compacted then should not be considered for minor.
+      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
+          segment.isMajorCompacted() != null && segment.isMajorCompacted()
+              .equalsIgnoreCase("true"))) {
+        continue;
+      }
+
+      // check if the segment is merged or not
+
+      if (!isMergedSegment(segName)) {
+        //if it is an unmerged segment then increment counter
+        unMergeCounter++;
+        unMergedSegments.add(segment);
+        if (unMergeCounter == (level1Size)) {
+          return unMergedSegments;
+        }
+      } else {
+        mergeCounter++;
+        mergedSegments.add(segment);
+        if (mergeCounter == (level2Size)) {
+          return mergedSegments;
+        }
+      }
+    }
+    return new ArrayList<>(0);
+  }
+
+  /**
+   * To check if the segment is merged or not.
+   * @param segName
+   * @return
+   */
+  private static boolean isMergedSegment(String segName) {
+    if (segName.contains(".")) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * checks number of loads to be preserved and returns remaining valid segments
+   *
+   * @param segments
+   * @return
+   */
+  private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
+      List<LoadMetadataDetails> segments) {
+    // check whether the preserving of the segments from merging is enabled or not.
+    // get the number of loads to be preserved.
+    int numberOfSegmentsToBePreserved =
+        CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
+    // get the number of valid segments and retain the latest loads from merging.
+    return CarbonDataMergerUtil
+        .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
+  }
+
+  /**
+   * Retain the number of segments which are to be preserved and return the remaining list of
+   * segments.
+   *
+   * @param loadMetadataDetails
+   * @param numberOfSegToBeRetained
+   * @return
+   */
+  private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
+      List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
+
+    List<LoadMetadataDetails> validList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails segment : loadMetadataDetails) {
+      if (isSegmentValid(segment)) {
+        validList.add(segment);
+      }
+    }
+
+    // check if valid list is big enough for removing the number of seg to be retained.
+    // last element
+    int removingIndex = validList.size() - 1;
+
+    for (int i = validList.size(); i > 0; i--) {
+      if (numberOfSegToBeRetained == 0) {
+        break;
+      }
+      // remove last segment
+      validList.remove(removingIndex--);
+      numberOfSegToBeRetained--;
+    }
+    return validList;
+
+  }
+
+  /**
+   * This will give the compaction sizes configured based on compaction type.
+   *
+   * @param compactionType
+   * @return
+   */
+  public static long getCompactionSize(CompactionType compactionType) {
+
+    long compactionSize = 0;
+    switch (compactionType) {
+      case MAJOR_COMPACTION:
+        compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
+        break;
+      default: // this case can not come.
+    }
+    return compactionSize;
+  }
+
+  /**
+   * For getting the comma separated valid segments for merging.
+   *
+   * @param loadMetadataDetails
+   * @return
+   */
+  public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
+    StringBuilder builder = new StringBuilder();
+    for (LoadMetadataDetails segment : loadMetadataDetails) {
+      //check if this load is an already merged load.
+      if (null != segment.getMergedLoadName()) {
+        builder.append(segment.getMergedLoadName()).append(",");
+      } else {
+        builder.append(segment.getLoadName()).append(",");
+      }
+    }
+    builder.deleteCharAt(builder.length() - 1);
+    return builder.toString();
+  }
+
+  /**
+   * This method returns the valid segments attached to the table Identifier.
+   *
+   * @param absoluteTableIdentifier
+   * @return
+   */
+  public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
+          throws IOException {
+
+    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
+    try {
+      validAndInvalidSegments =
+              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
+    } catch (IOException e) {
+      LOGGER.error("Error while getting valid segment list for a table identifier");
+      throw new IOException();
+    }
+    return validAndInvalidSegments.getValidSegments();
+  }
+
+
+  /**
+   * Removing the already merged segments from list.
+   */
+  public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
+      List<LoadMetadataDetails> segments,
+      LoadMetadataDetails lastSeg) {
+
+    // take complete list of segments.
+    List<LoadMetadataDetails> list = new ArrayList<>(segments);
+    // sort list
+    CarbonDataMergerUtil.sortSegments(list);
+
+    // first filter out newly added segments.
+    return list.subList(0, list.indexOf(lastSeg) + 1);
+
+  }
+
+  /**
+   * method to identify the segments qualified for merging in case of IUD Compaction.
+   *
+   * @param carbonLoadModel
+   * @param compactionType
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(
+      List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) {
+
+    List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size());
+
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+
+    int numberUpdateDeltaFilesThreshold =
+        CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+    for (LoadMetadataDetails seg : segments) {
+      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
+          absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
+          numberUpdateDeltaFilesThreshold)) {
+        validSegments.add(seg);
+      }
+    }
+    return validSegments;
+  }
+
+  private static boolean isSegmentValid(LoadMetadataDetails seg) {
+    return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+            || seg.getLoadStatus()
+            .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
+            .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
+  }
+
+  /**
+   * method gets the segments list which get qualified for IUD compaction.
+   * @param Segments
+   * @param absoluteTableIdentifier
+   * @param compactionTypeIUD
+   * @return
+   */
+  public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
+
+    List<String> validSegments = new ArrayList<>();
+
+    if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
+      int numberDeleteDeltaFilesThreshold =
+          CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
+      List<String> deleteSegments = new ArrayList<>();
+      for (String seg : Segments) {
+        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
+            numberDeleteDeltaFilesThreshold)) {
+          deleteSegments.add(seg);
+        }
+      }
+      if (deleteSegments.size() > 0) {
+        // This Code Block Append the Segname along with the Blocks selected for Merge instead of
+        // only taking the segment name. This will help to parallelize better for each block
+        // in case of Delete Horizontal Compaction.
+        for (String segName : deleteSegments) {
+          List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
+              numberDeleteDeltaFilesThreshold);
+          if (tempSegments != null) {
+            for (String tempSeg : tempSegments) {
+              validSegments.add(tempSeg);
+            }
+          }
+        }
+      }
+    } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
+      int numberUpdateDeltaFilesThreshold =
+          CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
+      for (String seg : Segments) {
+        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
+            numberUpdateDeltaFilesThreshold)) {
+          validSegments.add(seg);
+        }
+      }
+    }
+    return validSegments;
+  }
+
+  /**
+   * Check if the blockname of the segment belongs to the Valid Update Delta List or not.
+   * @param seg
+   * @param blkName
+   * @param segmentUpdateStatusManager
+   * @return
+   */
+  public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName,
+      SegmentUpdateStatusManager segmentUpdateStatusManager) {
+
+    List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg);
+
+    String fullBlock = blkName;
+    String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR);
+    String blockName = FileParts[FileParts.length - 1];
+
+    for (String str : list) {
+      if (str.contains(blockName)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * This method traverses Update Delta Files inside the seg and return true
+   * if UpdateDelta Files are more than IUD Compaction threshold.
+   *
+   * @param seg
+   * @param absoluteTableIdentifier
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+  public static Boolean checkUpdateDeltaFilesInSeg(String seg,
+      AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+
+    CarbonFile[] updateDeltaFiles = null;
+    Set<String> uniqueBlocks = new HashSet<String>();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+            absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
+    CarbonFile segDir =
+        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
+    CarbonFile[] allSegmentFiles = segDir.listFiles();
+
+    updateDeltaFiles = segmentUpdateStatusManager
+        .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
+            false, allSegmentFiles);
+
+    if (updateDeltaFiles == null) {
+      return false;
+    }
+
+    // The update Delta files may have Spill over blocks. Will consider multiple spill over
+    // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which
+    // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate
+    // Spill Over Blocks will choose files with unique taskID.
+    for (CarbonFile blocks : updateDeltaFiles) {
+      // Get Task ID and the Timestamp from the Block name for e.g.
+      // part-0-3-1481084721319.carbondata => "3-1481084721319"
+      String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+      String timestamp =
+          CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+      String taskAndTimeStamp = task + "-" + timestamp;
+      uniqueBlocks.add(taskAndTimeStamp);
+    }
+    if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
+   * if the number of delete delta files present in the segment is more than
+   * numberDeltaFilesThreshold.
+   *
+   * @param seg
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+  private static boolean checkDeleteDeltaFilesInSeg(String seg,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+
+    Set<String> uniqueBlocks = new HashSet<String>();
+    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+
+    for (final String blockName : blockNameList) {
+
+      CarbonFile[] deleteDeltaFiles =
+          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+
+      // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
+      // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
+      // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
+      // Spill Over Blocks will choose files with unique taskID.
+      for (CarbonFile blocks : deleteDeltaFiles) {
+        // Get Task ID and the Timestamp from the Block name for e.g.
+        // part-0-3-1481084721319.carbondata => "3-1481084721319"
+        String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
+        String timestamp =
+            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
+        String taskAndTimeStamp = task + "-" + timestamp;
+        uniqueBlocks.add(taskAndTimeStamp);
+      }
+
+      if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
+   * if the number of delete delta files present in the segment is more than
+   * numberDeltaFilesThreshold.
+   * @param seg
+   * @param segmentUpdateStatusManager
+   * @param numberDeltaFilesThreshold
+   * @return
+   */
+
+  private static List<String> getDeleteDeltaFilesInSeg(String seg,
+      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
+
+    List<String> blockLists = new ArrayList<>();
+    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
+
+    for (final String blockName : blockNameList) {
+
+      CarbonFile[] deleteDeltaFiles =
+          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+
+      if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
+        blockLists.add(seg + "/" + blockName);
+      }
+    }
+    return blockLists;
+  }
+
+  /**
+   * Returns true is horizontal compaction is enabled.
+   * @return
+   */
+  public static boolean isHorizontalCompactionEnabled() {
+    if ((CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled,
+            CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) {
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * method to compact Delete Delta files in case of IUD Compaction.
+   *
+   * @param seg
+   * @param blockName
+   * @param absoluteTableIdentifier
+   * @param segmentUpdateDetails
+   * @param timestamp
+   * @return
+   * @throws IOException
+   */
+  public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
+      String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+      SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
+
+    SegmentUpdateStatusManager segmentUpdateStatusManager =
+        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+
+    List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
+
+    // set the update status.
+    segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
+
+    CarbonFile[] deleteDeltaFiles =
+        segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
+
+    String destFileName =
+        blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
+    String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
+        + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
+
+    List<String> deleteFilePathList = new ArrayList<String>();
+    for (CarbonFile cFile : deleteDeltaFiles) {
+      deleteFilePathList.add(cFile.getCanonicalPath());
+    }
+
+    CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
+    blockDetails.setBlockName(blockName);
+    blockDetails.setSegmentName(seg);
+    blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
+    blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
+
+    try {
+      if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
+        blockDetails.setCompactionStatus(true);
+      } else {
+        blockDetails.setCompactionStatus(false);
+      }
+      resultList.add(blockDetails);
+    } catch (IOException e) {
+      LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
+          + fullBlockFilePath);
+      throw new IOException();
+    }
+    return resultList;
+  }
+
+  /**
+   * this method compact the delete delta files.
+   * @param deleteDeltaFiles
+   * @param blockName
+   * @param fullBlockFilePath
+   * @return
+   */
+  public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles,
+      String blockName, String fullBlockFilePath) throws IOException {
+
+    DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
+    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
+    try {
+      deleteDeltaBlockDetails =
+              dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
+    } catch (Exception e) {
+      String blockFilePath = fullBlockFilePath
+              .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
+      LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
+      throw new IOException();
+    }
+    CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
+            new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
+                    FileFactory.getFileType(fullBlockFilePath));
+    try {
+      carbonDeleteWriter.write(deleteDeltaBlockDetails);
+    } catch (IOException e) {
+      LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath);
+      throw new IOException();
+    }
+    return true;
+  }
+
+  public static Boolean updateStatusFile(
+          List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
+          String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
+
+    List<SegmentUpdateDetails> segmentUpdateDetails =
+            new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
+
+
+    // Check the list output.
+    for (CarbonDataMergerUtilResult carbonDataMergerUtilResult : updateDataMergerDetailsList) {
+      if (carbonDataMergerUtilResult.getCompactionStatus()) {
+        SegmentUpdateDetails tempSegmentUpdateDetails = new SegmentUpdateDetails();
+        tempSegmentUpdateDetails.setSegmentName(carbonDataMergerUtilResult.getSegmentName());
+        tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());
+
+        for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
+                .getUpdateStatusDetails()) {
+          if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
+                  && origDetails.getSegmentName()
+                  .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
+
+            tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
+            tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
+            break;
+          }
+        }
+
+        tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
+                carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
+        tempSegmentUpdateDetails
+              .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
+
+        segmentUpdateDetails.add(tempSegmentUpdateDetails);
+      } else return false;
+    }
+
+    CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
+
+    // Update the Table Status.
+    String metaDataFilepath = table.getMetaDataFilepath();
+    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
+
+    CarbonTablePath carbonTablePath = CarbonStorePath
+            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                    absoluteTableIdentifier.getCarbonTableIdentifier());
+
+    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
+
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
+
+    boolean lockStatus = false;
+
+    try {
+      lockStatus = carbonLock.lockWithRetries();
+      if (lockStatus) {
+        LOGGER.info(
+                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
+                        + " for table status updation");
+
+        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
+                segmentStatusManager.readLoadMetadata(metaDataFilepath);
+
+        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
+          if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
+            loadMetadata.setUpdateStatusFileName(
+                    CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
+          }
+        }
+        try {
+          segmentStatusManager
+                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
+        } catch (IOException e) {
+          return false;
+        }
+      } else {
+        LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
+                .getDatabaseName() + "." + table.getFactTableName());
+      }
+    } finally {
+      if (lockStatus) {
+        if (carbonLock.unlock()) {
+          LOGGER.info(
+                 "Table unlocked successfully after table status updation" + table.getDatabaseName()
+                          + "." + table.getFactTableName());
+        } else {
+          LOGGER.error(
+                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
+                          .getFactTableName() + " during table status updation");
+        }
+      }
+    }
+    return true;
+  }
+
+  /**
+   * This will update the property of segments as major compacted.
+   * @param model
+   * @param changedSegDetails
+   */
+  public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,
+      List<LoadMetadataDetails> changedSegDetails,
+      List<LoadMetadataDetails> preservedSegment) throws Exception {
+
+    String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+            model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+    LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
+    List<LoadMetadataDetails> originalList = Arrays.asList(details);
+    for (LoadMetadataDetails segment : changedSegDetails) {
+      if (preservedSegment.contains(segment)) {
+        continue;
+      }
+      originalList.get(originalList.indexOf(segment)).setMajorCompacted("true");
+
+    }
+
+
+    ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
+            model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
+            LockUsage.TABLE_STATUS_LOCK);
+
+    try {
+      if (carbonTableStatusLock.lockWithRetries()) {
+        LOGGER.info(
+            "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
+                        + " for table status updation ");
+        CarbonTablePath carbonTablePath = CarbonStorePath
+                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
+                        absoluteTableIdentifier.getCarbonTableIdentifier());
+
+        segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
+                originalList.toArray(new LoadMetadataDetails[originalList.size()]));
+      } else {
+        LOGGER.error(
+                "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
+                        .getTableName() + "for table status updation");
+        throw new Exception("Failed to update the MajorCompactionStatus.");
+      }
+    } catch (IOException e) {
+      LOGGER.error("Error while writing metadata");
+      throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage());
+    } finally {
+      if (carbonTableStatusLock.unlock()) {
+        LOGGER.info(
+                "Table unlocked successfully after table status updation" + model.getDatabaseName()
+                        + "." + model.getTableName());
+      } else {
+        LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
+                .getTableName() + " during table status updation");
+      }
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
new file mode 100644
index 0000000..aa3d801
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtilResult.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger;
+
+import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
+
+public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails {
+  private boolean compactionStatus;
+
+  public boolean getCompactionStatus() {
+    return compactionStatus;
+  }
+
+  public void setCompactionStatus(Boolean status) {
+    compactionStatus = status;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
new file mode 100644
index 0000000..ebf3683
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.newflow.row.CarbonRow;
+import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
+import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.CarbonFactHandlerFactory;
+import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
+
+/**
+ * This class will process the query result and convert the data
+ * into a format compatible for data load
+ */
+public class CompactionResultSortProcessor extends AbstractResultProcessor {
+
+  /**
+   * LOGGER
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName());
+  /**
+   * carbon load model that contains all the required information for load
+   */
+  private CarbonLoadModel carbonLoadModel;
+  /**
+   * carbon table
+   */
+  private CarbonTable carbonTable;
+  /**
+   * sortDataRows instance for sorting each row read ad writing to sort temp file
+   */
+  private SortDataRows sortDataRows;
+  /**
+   * final merger for merge sort
+   */
+  private SingleThreadFinalSortFilesMerger finalMerger;
+  /**
+   * data handler VO object
+   */
+  private CarbonFactHandler dataHandler;
+  /**
+   * segment properties for getting dimension cardinality and other required information of a block
+   */
+  private SegmentProperties segmentProperties;
+  /**
+   * compaction type to decide whether taskID need to be extracted from carbondata files
+   */
+  private CompactionType compactionType;
+  /**
+   * boolean mapping for no dictionary columns in schema
+   */
+  private boolean[] noDictionaryColMapping;
+  /**
+   * agg type defined for measures
+   */
+  private char[] aggType;
+  /**
+   * segment id
+   */
+  private String segmentId;
+  /**
+   * temp store location to be sued during data load
+   */
+  private String tempStoreLocation;
+  /**
+   * table name
+   */
+  private String tableName;
+  /**
+   * no dictionary column count in schema
+   */
+  private int noDictionaryCount;
+  /**
+   * total count of measures in schema
+   */
+  private int measureCount;
+  /**
+   * dimension count excluding complex dimension and no dictionary column count
+   */
+  private int dimensionColumnCount;
+  /**
+   * whether the allocated tasks has any record
+   */
+  private boolean isRecordFound;
+
+  /**
+   * @param carbonLoadModel
+   * @param carbonTable
+   * @param segmentProperties
+   * @param compactionType
+   * @param tableName
+   */
+  public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, CarbonTable carbonTable,
+      SegmentProperties segmentProperties, CompactionType compactionType, String tableName) {
+    this.carbonLoadModel = carbonLoadModel;
+    this.carbonTable = carbonTable;
+    this.segmentProperties = segmentProperties;
+    this.segmentId = carbonLoadModel.getSegmentId();
+    this.compactionType = compactionType;
+    this.tableName = tableName;
+  }
+
+  /**
+   * This method will iterate over the query result and convert it into a format compatible
+   * for data loading
+   *
+   * @param resultIteratorList
+   */
+  public boolean execute(List<RawResultIterator> resultIteratorList) {
+    boolean isCompactionSuccess = false;
+    try {
+      initTempStoreLocation();
+      initSortDataRows();
+      initAggType();
+      processResult(resultIteratorList);
+      // After delete command, if no records are fetched from one split,
+      // below steps are not required to be initialized.
+      if (isRecordFound) {
+        initializeFinalThreadMergerForMergeSort();
+        initDataHandler();
+        readAndLoadDataFromSortTempFiles();
+      }
+      isCompactionSuccess = true;
+    } catch (Exception e) {
+      LOGGER.error(e, "Compaction failed: " + e.getMessage());
+    } finally {
+      // clear temp files and folders created during compaction
+      deleteTempStoreLocation();
+    }
+    return isCompactionSuccess;
+  }
+
+  /**
+   * This method will clean up the local folders and files created during compaction process
+   */
+  private void deleteTempStoreLocation() {
+    if (null != tempStoreLocation) {
+      try {
+        CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) });
+      } catch (IOException | InterruptedException e) {
+        LOGGER.error("Problem deleting local folders during compaction: " + e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * This method will iterate over the query result and perform row sorting operation
+   *
+   * @param resultIteratorList
+   */
+  private void processResult(List<RawResultIterator> resultIteratorList)
+      throws Exception {
+    for (RawResultIterator resultIterator : resultIteratorList) {
+      while (resultIterator.hasNext()) {
+        addRowForSorting(prepareRowObjectForSorting(resultIterator.next()));
+        isRecordFound = true;
+      }
+    }
+    try {
+      sortDataRows.startSorting();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data during compaction: " + e.getMessage());
+    }
+  }
+
+  /**
+   * This method will prepare the data from raw object that will take part in sorting
+   *
+   * @param row
+   * @return
+   */
+  private Object[] prepareRowObjectForSorting(Object[] row) {
+    ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
+    // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
+    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
+    Object[] preparedRow = new Object[dimensions.size() + measureCount];
+    // convert the dictionary from MDKey to surrogate key
+    byte[] dictionaryKey = wrapper.getDictionaryKey();
+    long[] keyArray = segmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey);
+    Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount];
+    for (int i = 0; i < keyArray.length; i++) {
+      dictionaryValues[i] = Long.valueOf(keyArray[i]).intValue();
+    }
+    int noDictionaryIndex = 0;
+    int dictionaryIndex = 0;
+    for (int i = 0; i < dimensions.size(); i++) {
+      CarbonDimension dims = dimensions.get(i);
+      if (dims.hasEncoding(Encoding.DICTIONARY)) {
+        // dictionary
+        preparedRow[i] = dictionaryValues[dictionaryIndex++];
+      } else {
+        // no dictionary dims
+        preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
+      }
+    }
+    // fill all the measures
+    // measures will always start from 1st index in the row object array
+    int measureIndexInRow = 1;
+    for (int i = 0; i < measureCount; i++) {
+      preparedRow[dimensionColumnCount + i] =
+          getConvertedMeasureValue(row[measureIndexInRow++], aggType[i]);
+    }
+    return preparedRow;
+  }
+
+  /**
+   * This method will convert the spark decimal to java big decimal type
+   *
+   * @param value
+   * @param aggType
+   * @return
+   */
+  private Object getConvertedMeasureValue(Object value, char aggType) {
+    switch (aggType) {
+      case CarbonCommonConstants.BIG_DECIMAL_MEASURE:
+        value = ((org.apache.spark.sql.types.Decimal) value).toJavaBigDecimal();
+        return value;
+      default:
+        return value;
+    }
+  }
+
+  /**
+   * This method will read sort temp files, perform merge sort and add it to store for data loading
+   */
+  private void readAndLoadDataFromSortTempFiles() throws Exception {
+    try {
+      finalMerger.startFinalMerge();
+      while (finalMerger.hasNext()) {
+        Object[] rowRead = finalMerger.next();
+        CarbonRow row = new CarbonRow(rowRead);
+        // convert the row from surrogate key to MDKey
+        Object[] outputRow = CarbonDataProcessorUtil
+            .convertToMDKeyAndFillRow(row, segmentProperties, measureCount, noDictionaryCount,
+                segmentProperties.getComplexDimensions().size());
+        dataHandler.addDataToStore(outputRow);
+      }
+      dataHandler.finish();
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data during compaction: " + e.getMessage());
+    } catch (Exception e) {
+      LOGGER.error(e);
+      throw new Exception("Problem loading data during compaction: " + e.getMessage());
+    } finally {
+      if (null != dataHandler) {
+        try {
+          dataHandler.closeHandler();
+        } catch (CarbonDataWriterException e) {
+          LOGGER.error(e);
+          throw new Exception("Problem loading data during compaction: " + e.getMessage());
+        }
+      }
+    }
+  }
+
+  /**
+   * add row to a temp array which will we written to a sort temp file after sorting
+   *
+   * @param row
+   */
+  private void addRowForSorting(Object[] row) throws Exception {
+    try {
+      // prepare row array using RemoveDictionaryUtil class
+      sortDataRows.addRow(row);
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception("Row addition for sorting failed during compaction: " + e.getMessage());
+    }
+  }
+
+  /**
+   * create an instance of sort data rows
+   */
+  private void initSortDataRows() throws Exception {
+    measureCount = carbonTable.getMeasureByTableName(tableName).size();
+    List<CarbonDimension> dimensions = carbonTable.getDimensionByTableName(tableName);
+    noDictionaryColMapping = new boolean[dimensions.size()];
+    int i = 0;
+    for (CarbonDimension dimension : dimensions) {
+      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
+        i++;
+        continue;
+      }
+      noDictionaryColMapping[i++] = true;
+      noDictionaryCount++;
+    }
+    dimensionColumnCount = dimensions.size();
+    SortParameters parameters = createSortParameters();
+    SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters);
+    // TODO: Now it is only supported onheap merge, but we can have unsafe merge
+    // as well by using UnsafeSortDataRows.
+    this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
+    try {
+      this.sortDataRows.initialize();
+    } catch (CarbonSortKeyAndGroupByException e) {
+      LOGGER.error(e);
+      throw new Exception(
+          "Error initializing sort data rows object during compaction: " + e.getMessage());
+    }
+  }
+
+  /**
+   * This method will create the sort parameters VO object
+   *
+   * @return
+   */
+  private SortParameters createSortParameters() {
+    SortParameters parameters = SortParameters
+        .createSortParameters(carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount,
+            segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
+            carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(),
+            noDictionaryColMapping);
+    return parameters;
+  }
+
+  /**
+   * create an instance of finalThread merger which will perform merge sort on all the
+   * sort temp files
+   */
+  private void initializeFinalThreadMergerForMergeSort() {
+    String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
+    finalMerger =
+        new SingleThreadFinalSortFilesMerger(sortTempFileLocation, tableName, dimensionColumnCount,
+            segmentProperties.getComplexDimensions().size(), measureCount, noDictionaryCount,
+            aggType, noDictionaryColMapping);
+  }
+
+  /**
+   * initialise carbon data writer instance
+   */
+  private void initDataHandler() throws Exception {
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
+        .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName,
+            tempStoreLocation);
+    setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable,
+        carbonFactDataHandlerModel);
+    dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel,
+        CarbonFactHandlerFactory.FactHandlerType.COLUMNAR);
+    try {
+      dataHandler.initialise();
+    } catch (CarbonDataWriterException e) {
+      LOGGER.error(e);
+      throw new Exception("Problem initialising data handler during compaction: " + e.getMessage());
+    }
+  }
+
+  /**
+   * initialise temporary store location
+   */
+  private void initTempStoreLocation() {
+    tempStoreLocation = CarbonDataProcessorUtil
+        .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName,
+            carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, false);
+  }
+
+  /**
+   * initialise aggregation type for measures for their storage format
+   */
+  private void initAggType() {
+    aggType = CarbonDataProcessorUtil.initAggType(carbonTable, tableName, measureCount);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
new file mode 100644
index 0000000..6b9c80a
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+/**
+ * This enum is used to define the types of Compaction.
+ * We have 3 types. one is Minor another is Major and
+ * finally a compaction done after UPDATE-DELETE operation
+ * called IUD compaction.
+ */
+public enum CompactionType {
+    MINOR_COMPACTION,
+    MAJOR_COMPACTION,
+    IUD_UPDDEL_DELTA_COMPACTION,
+    IUD_DELETE_DELTA_COMPACTION,
+    NONE
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java
new file mode 100644
index 0000000..ecf4408
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeBlockRelation.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+/**
+ * Block to Node mapping
+ */
+public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
+
+  private final Distributable block;
+  private final String node;
+
+  public NodeBlockRelation(Distributable block, String node) {
+    this.block = block;
+    this.node = node;
+
+  }
+
+  public Distributable getBlock() {
+    return block;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  @Override public int compareTo(NodeBlockRelation obj) {
+    return this.getNode().compareTo(obj.getNode());
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof NodeBlockRelation)) {
+      return false;
+    }
+    NodeBlockRelation o = (NodeBlockRelation) obj;
+    return node.equals(o.node);
+  }
+
+  @Override public int hashCode() {
+    return node.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
new file mode 100644
index 0000000..ec2ddaf
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/NodeMultiBlockRelation.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.Distributable;
+
+public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
+
+  private final List<Distributable> blocks;
+  private final String node;
+
+  public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
+    this.node = node;
+    this.blocks = blocks;
+
+  }
+
+  public List<Distributable> getBlocks() {
+    return blocks;
+  }
+
+  public String getNode() {
+    return node;
+  }
+
+  @Override public int compareTo(NodeMultiBlockRelation obj) {
+    return this.blocks.size() - obj.getBlocks().size();
+  }
+
+  @Override public boolean equals(Object obj) {
+    if (!(obj instanceof NodeMultiBlockRelation)) {
+      return false;
+    }
+    NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
+    return blocks.equals(o.blocks) && node.equals(o.node);
+  }
+
+  @Override public int hashCode() {
+    return blocks.hashCode() + node.hashCode();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
new file mode 100644
index 0000000..b7aa32c
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.File;
+import java.util.AbstractQueue;
+import java.util.Comparator;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.keygenerator.KeyGenException;
+import org.apache.carbondata.core.metadata.CarbonMetadata;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
+import org.apache.carbondata.core.util.ByteUtil;
+import org.apache.carbondata.processing.merger.exeception.SliceMergerException;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+import org.apache.carbondata.processing.store.CarbonFactHandler;
+import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
+
+/**
+ * This is the Merger class responsible for the merging of the segments.
+ */
+public class RowResultMergerProcessor extends AbstractResultProcessor {
+
+  private CarbonFactHandler dataHandler;
+  private SegmentProperties segprop;
+  /**
+   * record holder heap
+   */
+  private AbstractQueue<RawResultIterator> recordHolderHeap;
+
+  private TupleConversionAdapter tupleConvertor;
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(RowResultMergerProcessor.class.getName());
+
+  public RowResultMergerProcessor(String databaseName,
+      String tableName, SegmentProperties segProp, String tempStoreLocation,
+      CarbonLoadModel loadModel, CompactionType compactionType) {
+    this.segprop = segProp;
+    if (!new File(tempStoreLocation).mkdirs()) {
+      LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
+    }
+    CarbonTable carbonTable = CarbonMetadata.getInstance()
+            .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
+    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel
+        .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName,
+            tempStoreLocation);
+    setDataFileAttributesInModel(loadModel, compactionType, carbonTable,
+        carbonFactDataHandlerModel);
+    carbonFactDataHandlerModel.setCompactionFlow(true);
+    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
+    tupleConvertor = new TupleConversionAdapter(segProp);
+  }
+
+  private void initRecordHolderHeap(List<RawResultIterator> rawResultIteratorList) {
+    // create the List of RawResultIterator.
+    recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
+        new RowResultMergerProcessor.CarbonMdkeyComparator());
+  }
+
+  /**
+   * Merge function
+   *
+   */
+  public boolean execute(List<RawResultIterator> resultIteratorList) {
+    initRecordHolderHeap(resultIteratorList);
+    boolean mergeStatus = false;
+    int index = 0;
+    boolean isDataPresent = false;
+    try {
+
+      // add all iterators to the queue
+      for (RawResultIterator leaftTupleIterator : resultIteratorList) {
+        this.recordHolderHeap.add(leaftTupleIterator);
+        index++;
+      }
+      RawResultIterator iterator = null;
+      while (index > 1) {
+        // iterator the top record
+        iterator = this.recordHolderHeap.poll();
+        Object[] convertedRow = iterator.next();
+        if (null == convertedRow) {
+          index--;
+          continue;
+        }
+        if (!isDataPresent) {
+          dataHandler.initialise();
+          isDataPresent = true;
+        }
+        // get the mdkey
+        addRow(convertedRow);
+        // if there is no record in the leaf and all then decrement the
+        // index
+        if (!iterator.hasNext()) {
+          index--;
+          continue;
+        }
+        // add record to heap
+        this.recordHolderHeap.add(iterator);
+      }
+      // if record holder is not empty then iterator the slice holder from
+      // heap
+      iterator = this.recordHolderHeap.poll();
+      while (true) {
+        Object[] convertedRow = iterator.next();
+        if (null == convertedRow) {
+          break;
+        }
+        // do it only once
+        if (!isDataPresent) {
+          dataHandler.initialise();
+          isDataPresent = true;
+        }
+        addRow(convertedRow);
+        // check if leaf contains no record
+        if (!iterator.hasNext()) {
+          break;
+        }
+      }
+      if (isDataPresent)
+      {
+        this.dataHandler.finish();
+      }
+      mergeStatus = true;
+    } catch (Exception e) {
+      LOGGER.error(e, e.getMessage());
+      LOGGER.error("Exception in compaction merger " + e.getMessage());
+      mergeStatus = false;
+    } finally {
+      try {
+        if (isDataPresent) {
+          this.dataHandler.closeHandler();
+        }
+      } catch (CarbonDataWriterException e) {
+        LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
+        mergeStatus = false;
+      }
+    }
+
+    return mergeStatus;
+  }
+
+  /**
+   * Below method will be used to add sorted row
+   *
+   * @throws SliceMergerException
+   */
+  private void addRow(Object[] carbonTuple) throws SliceMergerException {
+    Object[] rowInWritableFormat;
+
+    rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
+    try {
+      this.dataHandler.addDataToStore(rowInWritableFormat);
+    } catch (CarbonDataWriterException e) {
+      throw new SliceMergerException("Problem in merging the slice", e);
+    }
+  }
+
+  /**
+   * Comparator class for comparing 2 raw row result.
+   */
+  private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
+
+    @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
+
+      Object[] row1 = new Object[0];
+      Object[] row2 = new Object[0];
+      try {
+        row1 = o1.fetchConverted();
+        row2 = o2.fetchConverted();
+      } catch (KeyGenException e) {
+        LOGGER.error(e.getMessage());
+      }
+      if (null == row1 || null == row2) {
+        return 0;
+      }
+      ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
+      ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
+      int compareResult = 0;
+      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
+      int dictionaryKeyOffset = 0;
+      byte[] dimCols1 = key1.getDictionaryKey();
+      byte[] dimCols2 = key2.getDictionaryKey();
+      int noDicIndex = 0;
+      for (int eachColumnValueSize : columnValueSizes) {
+        // case of dictionary cols
+        if (eachColumnValueSize > 0) {
+
+          compareResult = ByteUtil.UnsafeComparer.INSTANCE
+              .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
+                  dictionaryKeyOffset, eachColumnValueSize);
+          dictionaryKeyOffset += eachColumnValueSize;
+        } else { // case of no dictionary
+
+          byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
+          byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
+          compareResult =
+              ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
+          noDicIndex++;
+
+        }
+        if (0 != compareResult) {
+          return compareResult;
+        }
+      }
+      return 0;
+    }
+  }
+
+}