You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/06 10:49:46 UTC

[4/7] incubator-carbondata git commit: Compaction lock should also be acquired during alter operation as alter and compaction on same table should not be allowed concurrently.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
deleted file mode 100644
index 14bed0a..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ /dev/null
@@ -1,1371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.io.File;
-import java.io.IOException;
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Calendar;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
-import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter;
-import org.apache.carbondata.core.datastore.impl.FileFactory;
-import org.apache.carbondata.core.locks.CarbonLockFactory;
-import org.apache.carbondata.core.locks.ICarbonLock;
-import org.apache.carbondata.core.locks.LockUsage;
-import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails;
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
-import org.apache.carbondata.core.reader.CarbonDeleteFilesDataReader;
-import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
-import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
-import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
-import org.apache.carbondata.core.util.CarbonProperties;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.spark.load.CarbonLoaderUtil;
-
-/**
- * utility class for load merging.
- */
-public final class CarbonDataMergerUtil {
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CarbonDataMergerUtil.class.getName());
-
-  private CarbonDataMergerUtil() {
-
-  }
-
-  /**
-   * Returns the size of all the carbondata files present in the segment.
-   * @param carbonFile
-   * @return
-   */
-  private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
-    long factSize = 0;
-
-    // carbon data file case.
-    CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
-
-      @Override public boolean accept(CarbonFile file) {
-        return CarbonTablePath.isCarbonDataFile(file.getName());
-      }
-    });
-
-    for (CarbonFile fact : factFile) {
-      factSize += fact.getSize();
-    }
-
-    return factSize;
-  }
-
-  /**
-   * To check whether the merge property is enabled or not.
-   *
-   * @return
-   */
-
-  public static boolean checkIfAutoLoadMergingRequired() {
-    // load merge is not supported as per new store format
-    // moving the load merge check in early to avoid unnecessary load listing causing IOException
-    // check whether carbons segment merging operation is enabled or not.
-    // default will be false.
-    String isLoadMergeEnabled = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
-            CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE);
-    if (isLoadMergeEnabled.equalsIgnoreCase("false")) {
-      return false;
-    }
-    return true;
-  }
-
-  /**
-   * Form the Name of the New Merge Folder
-   *
-   * @param segmentsToBeMergedList
-   * @return
-   */
-  public static String getMergedLoadName(List<LoadMetadataDetails> segmentsToBeMergedList) {
-    String firstSegmentName = segmentsToBeMergedList.get(0).getLoadName();
-    if (firstSegmentName.contains(".")) {
-      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
-      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
-      int fraction = Integer.parseInt(afterDecimal) + 1;
-      String mergedSegmentName = beforeDecimal + "." + fraction;
-      return CarbonCommonConstants.LOAD_FOLDER + mergedSegmentName;
-    } else {
-      String mergeName = firstSegmentName + "." + 1;
-      return CarbonCommonConstants.LOAD_FOLDER + mergeName;
-    }
-
-  }
-
-
-  /**
-   * Form the Name of the New Merge Folder
-   *
-   * @param segmentToBeMerged
-   * @return
-   */
-  public static String getMergedLoadName(final String segmentToBeMerged) {
-    String firstSegmentName = segmentToBeMerged;
-    if (firstSegmentName.contains(".")) {
-      String beforeDecimal = firstSegmentName.substring(0, firstSegmentName.indexOf("."));
-      String afterDecimal = firstSegmentName.substring(firstSegmentName.indexOf(".") + 1);
-      int fraction = Integer.parseInt(afterDecimal) + 1;
-      String mergedSegmentName = beforeDecimal + "." + fraction;
-      return mergedSegmentName;
-    } else {
-      String mergeName = firstSegmentName + "." + 1;
-      return mergeName;
-    }
-
-  }
-
-  /**
-   * Update Both Segment Update Status and Table Status for the case of IUD Delete
-   * delta compaction.
-   *
-   * @param loadsToMerge
-   * @param metaDataFilepath
-   * @param carbonLoadModel
-   * @return
-   */
-  public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
-      List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
-      CarbonLoadModel carbonLoadModel) {
-
-    boolean status = false;
-    boolean updateLockStatus = false;
-    boolean tableLockStatus = false;
-
-    String timestamp = carbonLoadModel.getFactTimeStamp();
-
-    List<String> updatedDeltaFilesList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    // This routine updateLoadMetadataIUDCompactionMergeStatus is suppose to update
-    // two files as it is only called during IUD_UPDDEL_DELTA_COMPACTION. Along with
-    // Table Status Metadata file (For Update Block Compaction) it has to update the
-    // Table Update Status Metadata File (For corresponding Delete Delta File).
-    // As the IUD_UPDDEL_DELTA_COMPACTION going to write in the same segment therefore in
-    // A) Table Update Status Metadata File (Block Level)
-    //      * For each blocks which is being compacted Mark 'Compacted' as the Status.
-    // B) Table Status Metadata file (Segment Level)
-    //      * loadStatus won't be changed to "compacted'
-    //      * UpdateDeltaStartTime and UpdateDeltaEndTime will be both set to current
-    //        timestamp (which is being passed from driver)
-    // First the Table Update Status Metadata File should be updated as we need to get
-    // the updated blocks for the segment from Table Status Metadata Update Delta Start and
-    // End Timestamp.
-
-    // Table Update Status Metadata Update.
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock();
-    ICarbonLock statusLock = segmentStatusManager.getTableStatusLock();
-
-    // Update the Compacted Blocks with Compacted Status.
-    try {
-      updatedDeltaFilesList = segmentUpdateStatusManager
-          .getUpdateDeltaFiles(loadsToMerge.get(0).getLoadName().toString());
-    } catch (Exception e) {
-      LOGGER.error("Error while getting the Update Delta Blocks.");
-      status = false;
-      return status;
-    }
-
-    if (updatedDeltaFilesList.size() > 0) {
-      try {
-        updateLockStatus = updateLock.lockWithRetries();
-        tableLockStatus = statusLock.lockWithRetries();
-
-        List<String> blockNames = new ArrayList<>(updatedDeltaFilesList.size());
-
-        for (String compactedBlocks : updatedDeltaFilesList) {
-          // Try to BlockName
-          String fullBlock = compactedBlocks;
-          int endIndex = fullBlock.lastIndexOf(File.separator);
-          String blkNoExt = fullBlock.substring(endIndex + 1, fullBlock.lastIndexOf("-"));
-          blockNames.add(blkNoExt);
-        }
-
-        if (updateLockStatus && tableLockStatus) {
-
-          SegmentUpdateDetails[] updateLists = segmentUpdateStatusManager
-              .readLoadMetadata();
-
-          for (String compactedBlocks : blockNames) {
-            // Check is the compactedBlocks name matches with oldDetails
-            for (int i = 0; i < updateLists.length; i++) {
-              if (updateLists[i].getBlockName().equalsIgnoreCase(compactedBlocks)
-                  && !CarbonCommonConstants.COMPACTED.equalsIgnoreCase(updateLists[i].getStatus())
-                  && !CarbonCommonConstants.MARKED_FOR_DELETE
-                  .equalsIgnoreCase(updateLists[i].getStatus())) {
-                updateLists[i].setStatus(CarbonCommonConstants.COMPACTED);
-              }
-            }
-          }
-
-          LoadMetadataDetails[] loadDetails =
-              segmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-          for (LoadMetadataDetails loadDetail : loadDetails) {
-            if (loadsToMerge.contains(loadDetail)) {
-              loadDetail.setUpdateDeltaStartTimestamp(timestamp);
-              loadDetail.setUpdateDeltaEndTimestamp(timestamp);
-              if (loadDetail.getLoadName().equalsIgnoreCase("0")) {
-                loadDetail
-                    .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
-              }
-            }
-          }
-
-          try {
-            segmentUpdateStatusManager
-                .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp);
-            segmentStatusManager
-                .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails);
-            status = true;
-          } catch (IOException e) {
-            LOGGER.error(
-                "Error while writing metadata. The metadata file path is " + carbonTablePath
-                    .getMetadataDirectoryPath());
-            status = false;
-          }
-        } else {
-          LOGGER.error("Not able to acquire the lock.");
-          status = false;
-        }
-      } catch (Exception e) {
-        LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath
-            .getMetadataDirectoryPath());
-        status = false;
-
-      } finally {
-        if (updateLockStatus) {
-          if (updateLock.unlock()) {
-            LOGGER.info("Unlock the segment update lock successfully.");
-          } else {
-            LOGGER.error("Not able to unlock the segment update lock.");
-          }
-        }
-        if (tableLockStatus) {
-          if (statusLock.unlock()) {
-            LOGGER.info("Unlock the table status lock successfully.");
-          } else {
-            LOGGER.error("Not able to unlock the table status lock.");
-          }
-        }
-      }
-    }
-    return status;
-  }
-
-  /**
-   * method to update table status in case of IUD Update Delta Compaction.
-   * @param loadsToMerge
-   * @param metaDataFilepath
-   * @param MergedLoadName
-   * @param carbonLoadModel
-   * @param compactionType
-   * @return
-   */
-  public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails> loadsToMerge,
-      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
-      long mergeLoadStartTime, CompactionType compactionType) {
-
-    boolean tableStatusUpdationStatus = false;
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-
-    try {
-      if (carbonLock.lockWithRetries()) {
-        LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "."
-            + carbonLoadModel.getTableName() + " for table status updation ");
-
-        CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        String statusFilePath = carbonTablePath.getTableStatusFilePath();
-
-        LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        String mergedLoadNumber = MergedLoadName.substring(
-            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
-        long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
-        for (LoadMetadataDetails loadDetail : loadDetails) {
-          // check if this segment is merged.
-          if (loadsToMerge.contains(loadDetail)) {
-            // if the compacted load is deleted after the start of the compaction process,
-            // then need to discard the compaction process and treat it as failed compaction.
-            if (loadDetail.getLoadStatus()
-                .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) {
-              LOGGER.error("Compaction is aborted as the segment " + loadDetail.getLoadName()
-                  + " is deleted after the compaction is started.");
-              return false;
-            }
-            loadDetail.setLoadStatus(CarbonCommonConstants.COMPACTED);
-            loadDetail.setModificationOrdeletionTimesStamp(modificationOrDeletionTimeStamp);
-            loadDetail.setMergedLoadName(mergedLoadNumber);
-          }
-        }
-
-        // create entry for merged one.
-        LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-        loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId());
-        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS);
-        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
-        loadMetadataDetails.setLoadEndTime(loadEnddate);
-        loadMetadataDetails.setLoadName(mergedLoadNumber);
-        loadMetadataDetails.setLoadStartTime(mergeLoadStartTime);
-        loadMetadataDetails.setPartitionCount("0");
-        // if this is a major compaction then set the segment as major compaction.
-        if (compactionType == CompactionType.MAJOR_COMPACTION) {
-          loadMetadataDetails.setMajorCompacted("true");
-        }
-
-        List<LoadMetadataDetails> updatedDetailsList = new ArrayList<>(Arrays.asList(loadDetails));
-
-        // put the merged folder entry
-        updatedDetailsList.add(loadMetadataDetails);
-
-        try {
-          SegmentStatusManager.writeLoadDetailsIntoFile(statusFilePath,
-              updatedDetailsList.toArray(new LoadMetadataDetails[updatedDetailsList.size()]));
-          tableStatusUpdationStatus = true;
-        } catch (IOException e) {
-          LOGGER.error("Error while writing metadata");
-        }
-      } else {
-        LOGGER.error(
-            "Could not able to obtain lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + "for table status updation");
-      }
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + carbonLoadModel
-            .getDatabaseName() + "." + carbonLoadModel.getTableName());
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + carbonLoadModel.getDatabaseName() + "."
-                + carbonLoadModel.getTableName() + " during table status updation");
-      }
-    }
-    return tableStatusUpdationStatus;
-  }
-
-  /**
-   * To identify which all segments can be merged.
-   *
-   * @param storeLocation
-   * @param carbonLoadModel
-   * @param compactionSize
-   * @return
-   */
-  public static List<LoadMetadataDetails> identifySegmentsToBeMerged(String storeLocation,
-      CarbonLoadModel carbonLoadModel, long compactionSize,
-      List<LoadMetadataDetails> segments, CompactionType compactionType) {
-
-    List<LoadMetadataDetails> sortedSegments = new ArrayList<LoadMetadataDetails>(segments);
-
-    sortSegments(sortedSegments);
-
-    // Check for segments which are qualified for IUD compaction.
-    if (compactionType.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
-
-      List<LoadMetadataDetails> listOfSegmentsToBeMerged =
-          identifySegmentsToBeMergedBasedOnIUD(sortedSegments, carbonLoadModel);
-
-      return listOfSegmentsToBeMerged;
-    }
-
-    // check preserve property and preserve the configured number of latest loads.
-
-    List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
-        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
-
-    // filter the segments if the compaction based on days is configured.
-
-    List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
-        identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
-    List<LoadMetadataDetails> listOfSegmentsToBeMerged;
-    // identify the segments to merge based on the Size of the segments across partition.
-    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
-
-      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
-          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, storeLocation);
-    } else {
-
-      listOfSegmentsToBeMerged =
-          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
-    }
-
-    return listOfSegmentsToBeMerged;
-  }
-
-  /**
-   * Sorting of the segments.
-   * @param segments
-   */
-  public static void sortSegments(List segments) {
-    // sort the segment details.
-    Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
-      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
-        double seg1Id = Double.parseDouble(seg1.getLoadName());
-        double seg2Id = Double.parseDouble(seg2.getLoadName());
-        if (seg1Id - seg2Id < 0) {
-          return -1;
-        }
-        if (seg1Id - seg2Id > 0) {
-          return 1;
-        }
-        return 0;
-      }
-    });
-  }
-
-  /**
-   * This method will return the list of loads which are loaded at the same interval.
-   * This property is configurable.
-   *
-   * @param listOfSegmentsBelowThresholdSize
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnLoadedDate(
-      List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize) {
-
-    List<LoadMetadataDetails> loadsOfSameDate =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    long numberOfDaysAllowedToMerge = 0;
-    try {
-      numberOfDaysAllowedToMerge = Long.parseLong(CarbonProperties.getInstance()
-          .getProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
-              CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT));
-      if (numberOfDaysAllowedToMerge < 0 || numberOfDaysAllowedToMerge > 100) {
-        LOGGER.error(
-            "The specified value for property " + CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT
-                + " is incorrect."
-                + " Correct value should be in range of 0 -100. Taking the default value.");
-        numberOfDaysAllowedToMerge =
-            Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
-      }
-
-    } catch (NumberFormatException e) {
-      numberOfDaysAllowedToMerge =
-          Long.parseLong(CarbonCommonConstants.DEFAULT_DAYS_ALLOWED_TO_COMPACT);
-    }
-    // if true then process loads according to the load date.
-    if (numberOfDaysAllowedToMerge > 0) {
-
-      // filter loads based on the loaded date
-      boolean first = true;
-      Date segDate1 = null;
-      SimpleDateFormat sdf = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      for (LoadMetadataDetails segment : listOfSegmentsBelowThresholdSize) {
-
-        if (first) {
-          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
-          first = false;
-          continue;
-        }
-        long segmentDate = segment.getLoadStartTime();
-        Date segDate2 = null;
-        try {
-          segDate2 = sdf.parse(sdf.format(segmentDate));
-        } catch (ParseException e) {
-          LOGGER.error("Error while parsing segment start time" + e.getMessage());
-        }
-
-        if (isTwoDatesPresentInRequiredRange(segDate1, segDate2, numberOfDaysAllowedToMerge)) {
-          loadsOfSameDate.add(segment);
-        }
-        // if the load is beyond merged date.
-        // then reset everything and continue search for loads.
-        else if (loadsOfSameDate.size() < 2) {
-          loadsOfSameDate.clear();
-          // need to add the next segment as first and  to check further
-          segDate1 = initializeFirstSegment(loadsOfSameDate, segment, sdf);
-        } else { // case where a load is beyond merge date and there is at least 2 loads to merge.
-          break;
-        }
-      }
-    } else {
-      return listOfSegmentsBelowThresholdSize;
-    }
-
-    return loadsOfSameDate;
-  }
-
-  /**
-   * @param loadsOfSameDate
-   * @param segment
-   * @return
-   */
-  private static Date initializeFirstSegment(List<LoadMetadataDetails> loadsOfSameDate,
-      LoadMetadataDetails segment, SimpleDateFormat sdf) {
-    long baselineLoadStartTime = segment.getLoadStartTime();
-    Date segDate1 = null;
-    try {
-      segDate1 = sdf.parse(sdf.format(baselineLoadStartTime));
-    } catch (ParseException e) {
-      LOGGER.error("Error while parsing segment start time" + e.getMessage());
-    }
-    loadsOfSameDate.add(segment);
-    return segDate1;
-  }
-
-  /**
-   * Method to check if the load dates are complied to the configured dates.
-   *
-   * @param segDate1
-   * @param segDate2
-   * @return
-   */
-  private static boolean isTwoDatesPresentInRequiredRange(Date segDate1, Date segDate2,
-      long numberOfDaysAllowedToMerge) {
-    if (segDate1 == null || segDate2 == null) {
-      return false;
-    }
-    // take 1 st date add the configured days .
-    Calendar cal1 = Calendar.getInstance();
-    cal1.set(segDate1.getYear(), segDate1.getMonth(), segDate1.getDate());
-    Calendar cal2 = Calendar.getInstance();
-    cal2.set(segDate2.getYear(), segDate2.getMonth(), segDate2.getDate());
-
-    long diff = cal2.getTimeInMillis() - cal1.getTimeInMillis();
-
-    if ((diff / (24 * 60 * 60 * 1000)) < numberOfDaysAllowedToMerge) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * Identify the segments to be merged based on the Size in case of Major compaction.
-   *
-   * @param compactionSize
-   * @param listOfSegmentsAfterPreserve
-   * @param carbonLoadModel
-   * @param storeLocation
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
-      long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, String storeLocation) {
-
-    List<LoadMetadataDetails> segmentsToBeMerged =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    CarbonTableIdentifier tableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
-
-
-    // total length
-    long totalLength = 0;
-
-    // check size of each segment , sum it up across partitions
-    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
-      String segId = segment.getLoadName();
-      // variable to store one  segment size across partition.
-      long sizeOfOneSegmentAcrossPartition =
-          getSizeOfSegment(storeLocation, tableIdentifier, segId);
-
-      // if size of a segment is greater than the Major compaction size. then ignore it.
-      if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
-        // if already 2 segments have been found for merging then stop scan here and merge.
-        if (segmentsToBeMerged.size() > 1) {
-          break;
-        } else { // if only one segment is found then remove the earlier one in list.
-          // reset the total length to 0.
-          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-          totalLength = 0;
-          continue;
-        }
-      }
-
-      totalLength += sizeOfOneSegmentAcrossPartition;
-
-      // in case of major compaction the size doesnt matter. all the segments will be merged.
-      if (totalLength < (compactionSize * 1024 * 1024)) {
-        segmentsToBeMerged.add(segment);
-      } else { // if already 2 segments have been found for merging then stop scan here and merge.
-        if (segmentsToBeMerged.size() > 1) {
-          break;
-        } else { // if only one segment is found then remove the earlier one in list and put this.
-          // reset the total length to the current identified segment.
-          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-          segmentsToBeMerged.add(segment);
-          totalLength = sizeOfOneSegmentAcrossPartition;
-        }
-      }
-
-    }
-
-    return segmentsToBeMerged;
-  }
-
-  /**
-   * For calculating the size of the specified segment
-   * @param storeLocation
-   * @param tableIdentifier
-   * @param segId
-   * @return
-   */
-  private static long getSizeOfSegment(String storeLocation,
-      CarbonTableIdentifier tableIdentifier, String segId) {
-    String loadPath = CarbonLoaderUtil
-        .getStoreLocation(storeLocation, tableIdentifier, segId);
-    CarbonFile segmentFolder =
-        FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-    return getSizeOfFactFileInLoad(segmentFolder);
-  }
-
-  /**
-   * Identify the segments to be merged based on the segment count
-   *
-   * @param listOfSegmentsAfterPreserve
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
-      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
-
-    List<LoadMetadataDetails> mergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    List<LoadMetadataDetails> unMergedSegments =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-
-    int[] noOfSegmentLevelsCount =
-        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
-
-    int level1Size = 0;
-    int level2Size = 0;
-    int size = noOfSegmentLevelsCount.length;
-
-    if (size >= 2) {
-      level1Size = noOfSegmentLevelsCount[0];
-      level2Size = noOfSegmentLevelsCount[1];
-    } else if (size == 1) {
-      level1Size = noOfSegmentLevelsCount[0];
-    }
-
-    int unMergeCounter = 0;
-    int mergeCounter = 0;
-
-    // check size of each segment , sum it up across partitions
-    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-
-      String segName = segment.getLoadName();
-
-      // if a segment is already merged 2 levels then it s name will become .2
-      // need to exclude those segments from minor compaction.
-      // if a segment is major compacted then should not be considered for minor.
-      if (segName.endsWith(CarbonCommonConstants.LEVEL2_COMPACTION_INDEX) || (
-          segment.isMajorCompacted() != null && segment.isMajorCompacted()
-              .equalsIgnoreCase("true"))) {
-        continue;
-      }
-
-      // check if the segment is merged or not
-
-      if (!isMergedSegment(segName)) {
-        //if it is an unmerged segment then increment counter
-        unMergeCounter++;
-        unMergedSegments.add(segment);
-        if (unMergeCounter == (level1Size)) {
-          return unMergedSegments;
-        }
-      } else {
-        mergeCounter++;
-        mergedSegments.add(segment);
-        if (mergeCounter == (level2Size)) {
-          return mergedSegments;
-        }
-      }
-    }
-    return new ArrayList<>(0);
-  }
-
-  /**
-   * To check if the segment is merged or not.
-   * @param segName
-   * @return
-   */
-  private static boolean isMergedSegment(String segName) {
-    if (segName.contains(".")) {
-      return true;
-    }
-    return false;
-  }
-
-  /**
-   * checks number of loads to be preserved and returns remaining valid segments
-   *
-   * @param segments
-   * @return
-   */
-  private static List<LoadMetadataDetails> checkPreserveSegmentsPropertyReturnRemaining(
-      List<LoadMetadataDetails> segments) {
-    // check whether the preserving of the segments from merging is enabled or not.
-    // get the number of loads to be preserved.
-    int numberOfSegmentsToBePreserved =
-        CarbonProperties.getInstance().getNumberOfSegmentsToBePreserved();
-    // get the number of valid segments and retain the latest loads from merging.
-    return CarbonDataMergerUtil
-        .getValidLoadDetailsWithRetaining(segments, numberOfSegmentsToBePreserved);
-  }
-
-  /**
-   * Retain the number of segments which are to be preserved and return the remaining list of
-   * segments.
-   *
-   * @param loadMetadataDetails
-   * @param numberOfSegToBeRetained
-   * @return
-   */
-  private static List<LoadMetadataDetails> getValidLoadDetailsWithRetaining(
-      List<LoadMetadataDetails> loadMetadataDetails, int numberOfSegToBeRetained) {
-
-    List<LoadMetadataDetails> validList =
-        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-    for (LoadMetadataDetails segment : loadMetadataDetails) {
-      if (isSegmentValid(segment)) {
-        validList.add(segment);
-      }
-    }
-
-    // check if valid list is big enough for removing the number of seg to be retained.
-    // last element
-    int removingIndex = validList.size() - 1;
-
-    for (int i = validList.size(); i > 0; i--) {
-      if (numberOfSegToBeRetained == 0) {
-        break;
-      }
-      // remove last segment
-      validList.remove(removingIndex--);
-      numberOfSegToBeRetained--;
-    }
-    return validList;
-
-  }
-
-  /**
-   * This will give the compaction sizes configured based on compaction type.
-   *
-   * @param compactionType
-   * @return
-   */
-  public static long getCompactionSize(CompactionType compactionType) {
-
-    long compactionSize = 0;
-    switch (compactionType) {
-      case MAJOR_COMPACTION:
-        compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
-        break;
-      default: // this case can not come.
-    }
-    return compactionSize;
-  }
-
-  /**
-   * For getting the comma separated valid segments for merging.
-   *
-   * @param loadMetadataDetails
-   * @return
-   */
-  public static String getValidSegments(List<LoadMetadataDetails> loadMetadataDetails) {
-    StringBuilder builder = new StringBuilder();
-    for (LoadMetadataDetails segment : loadMetadataDetails) {
-      //check if this load is an already merged load.
-      if (null != segment.getMergedLoadName()) {
-        builder.append(segment.getMergedLoadName()).append(",");
-      } else {
-        builder.append(segment.getLoadName()).append(",");
-      }
-    }
-    builder.deleteCharAt(builder.length() - 1);
-    return builder.toString();
-  }
-
-  /**
-   * This method returns the valid segments attached to the table Identifier.
-   *
-   * @param absoluteTableIdentifier
-   * @return
-   */
-  public static List<String> getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier)
-          throws IOException {
-
-    SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null;
-    try {
-      validAndInvalidSegments =
-              new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments();
-    } catch (IOException e) {
-      LOGGER.error("Error while getting valid segment list for a table identifier");
-      throw new IOException();
-    }
-    return validAndInvalidSegments.getValidSegments();
-  }
-
-
-  /**
-   * Removing the already merged segments from list.
-   */
-  public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
-      List<LoadMetadataDetails> segments,
-      LoadMetadataDetails lastSeg) {
-
-    // take complete list of segments.
-    List<LoadMetadataDetails> list = new ArrayList<>(segments);
-    // sort list
-    CarbonDataMergerUtil.sortSegments(list);
-
-    // first filter out newly added segments.
-    return list.subList(0, list.indexOf(lastSeg) + 1);
-
-  }
-
-  /**
-   * method to identify the segments qualified for merging in case of IUD Compaction.
-   *
-   * @param carbonLoadModel
-   * @param compactionType
-   * @return
-   */
-  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnIUD(
-      List<LoadMetadataDetails> segments, CarbonLoadModel carbonLoadModel) {
-
-    List<LoadMetadataDetails> validSegments = new ArrayList<>(segments.size());
-
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-
-    int numberUpdateDeltaFilesThreshold =
-        CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-    for (LoadMetadataDetails seg : segments) {
-      if ((isSegmentValid(seg)) && checkUpdateDeltaFilesInSeg(seg.getLoadName(),
-          absoluteTableIdentifier, carbonLoadModel.getSegmentUpdateStatusManager(),
-          numberUpdateDeltaFilesThreshold)) {
-        validSegments.add(seg);
-      }
-    }
-    return validSegments;
-  }
-
-  private static boolean isSegmentValid(LoadMetadataDetails seg) {
-    return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
-            || seg.getLoadStatus()
-            .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg
-            .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE);
-  }
-
-  /**
-   * method gets the segments list which get qualified for IUD compaction.
-   * @param Segments
-   * @param absoluteTableIdentifier
-   * @param compactionTypeIUD
-   * @return
-   */
-  public static List<String> getSegListIUDCompactionQualified(List<String> Segments,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, CompactionType compactionTypeIUD) {
-
-    List<String> validSegments = new ArrayList<>();
-
-    if (compactionTypeIUD.equals(CompactionType.IUD_DELETE_DELTA_COMPACTION)) {
-      int numberDeleteDeltaFilesThreshold =
-          CarbonProperties.getInstance().getNoDeleteDeltaFilesThresholdForIUDCompaction();
-      List<String> deleteSegments = new ArrayList<>();
-      for (String seg : Segments) {
-        if (checkDeleteDeltaFilesInSeg(seg, segmentUpdateStatusManager,
-            numberDeleteDeltaFilesThreshold)) {
-          deleteSegments.add(seg);
-        }
-      }
-      if (deleteSegments.size() > 0) {
-        // This Code Block Append the Segname along with the Blocks selected for Merge instead of
-        // only taking the segment name. This will help to parallelize better for each block
-        // in case of Delete Horizontal Compaction.
-        for (String segName : deleteSegments) {
-          List<String> tempSegments = getDeleteDeltaFilesInSeg(segName, segmentUpdateStatusManager,
-              numberDeleteDeltaFilesThreshold);
-          if (tempSegments != null) {
-            for (String tempSeg : tempSegments) {
-              validSegments.add(tempSeg);
-            }
-          }
-        }
-      }
-    } else if (compactionTypeIUD.equals(CompactionType.IUD_UPDDEL_DELTA_COMPACTION)) {
-      int numberUpdateDeltaFilesThreshold =
-          CarbonProperties.getInstance().getNoUpdateDeltaFilesThresholdForIUDCompaction();
-      for (String seg : Segments) {
-        if (checkUpdateDeltaFilesInSeg(seg, absoluteTableIdentifier, segmentUpdateStatusManager,
-            numberUpdateDeltaFilesThreshold)) {
-          validSegments.add(seg);
-        }
-      }
-    }
-    return validSegments;
-  }
-
-  /**
-   * Check if the blockname of the segment belongs to the Valid Update Delta List or not.
-   * @param seg
-   * @param blkName
-   * @param segmentUpdateStatusManager
-   * @return
-   */
-  public static Boolean checkUpdateDeltaMatchBlock(final String seg, final String blkName,
-      SegmentUpdateStatusManager segmentUpdateStatusManager) {
-
-    List<String> list = segmentUpdateStatusManager.getUpdateDeltaFiles(seg);
-
-    String fullBlock = blkName;
-    String[] FileParts = fullBlock.split(CarbonCommonConstants.FILE_SEPARATOR);
-    String blockName = FileParts[FileParts.length - 1];
-
-    for (String str : list) {
-      if (str.contains(blockName)) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * This method traverses Update Delta Files inside the seg and return true
-   * if UpdateDelta Files are more than IUD Compaction threshold.
-   *
-   * @param seg
-   * @param absoluteTableIdentifier
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
-   */
-  public static Boolean checkUpdateDeltaFilesInSeg(String seg,
-      AbsoluteTableIdentifier absoluteTableIdentifier,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
-
-    CarbonFile[] updateDeltaFiles = null;
-    Set<String> uniqueBlocks = new HashSet<String>();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-        .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-            absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg);
-    CarbonFile segDir =
-        FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath));
-    CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-    updateDeltaFiles = segmentUpdateStatusManager
-        .getUpdateDeltaFilesForSegment(seg, true, CarbonCommonConstants.UPDATE_DELTA_FILE_EXT,
-            false, allSegmentFiles);
-
-    if (updateDeltaFiles == null) {
-      return false;
-    }
-
-    // The update Delta files may have Spill over blocks. Will consider multiple spill over
-    // blocks as one. Currently updateDeltaFiles array contains Update Delta Block name which
-    // lies within UpdateDelta Start TimeStamp and End TimeStamp. In order to eliminate
-    // Spill Over Blocks will choose files with unique taskID.
-    for (CarbonFile blocks : updateDeltaFiles) {
-      // Get Task ID and the Timestamp from the Block name for e.g.
-      // part-0-3-1481084721319.carbondata => "3-1481084721319"
-      String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
-      String timestamp =
-          CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
-      String taskAndTimeStamp = task + "-" + timestamp;
-      uniqueBlocks.add(taskAndTimeStamp);
-    }
-    if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
-   * if the number of delete delta files present in the segment is more than
-   * numberDeltaFilesThreshold.
-   *
-   * @param seg
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
-   */
-  private static boolean checkDeleteDeltaFilesInSeg(String seg,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
-
-    Set<String> uniqueBlocks = new HashSet<String>();
-    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
-
-    for (final String blockName : blockNameList) {
-
-      CarbonFile[] deleteDeltaFiles =
-          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-
-      // The Delete Delta files may have Spill over blocks. Will consider multiple spill over
-      // blocks as one. Currently DeleteDeltaFiles array contains Delete Delta Block name which
-      // lies within Delete Delta Start TimeStamp and End TimeStamp. In order to eliminate
-      // Spill Over Blocks will choose files with unique taskID.
-      for (CarbonFile blocks : deleteDeltaFiles) {
-        // Get Task ID and the Timestamp from the Block name for e.g.
-        // part-0-3-1481084721319.carbondata => "3-1481084721319"
-        String task = CarbonTablePath.DataFileUtil.getTaskNo(blocks.getName());
-        String timestamp =
-            CarbonTablePath.DataFileUtil.getTimeStampFromDeleteDeltaFile(blocks.getName());
-        String taskAndTimeStamp = task + "-" + timestamp;
-        uniqueBlocks.add(taskAndTimeStamp);
-      }
-
-      if (uniqueBlocks.size() > numberDeltaFilesThreshold) {
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Check is the segment passed qualifies for IUD delete delta compaction or not i.e.
-   * if the number of delete delta files present in the segment is more than
-   * numberDeltaFilesThreshold.
-   * @param seg
-   * @param segmentUpdateStatusManager
-   * @param numberDeltaFilesThreshold
-   * @return
-   */
-
-  private static List<String> getDeleteDeltaFilesInSeg(String seg,
-      SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) {
-
-    List<String> blockLists = new ArrayList<>();
-    List<String> blockNameList = segmentUpdateStatusManager.getBlockNameFromSegment(seg);
-
-    for (final String blockName : blockNameList) {
-
-      CarbonFile[] deleteDeltaFiles =
-          segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-
-      if (deleteDeltaFiles.length > numberDeltaFilesThreshold) {
-        blockLists.add(seg + "/" + blockName);
-      }
-    }
-    return blockLists;
-  }
-
-  /**
-   * Returns true is horizontal compaction is enabled.
-   * @return
-   */
-  public static boolean isHorizontalCompactionEnabled() {
-    if ((CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.isHorizontalCompactionEnabled,
-            CarbonCommonConstants.defaultIsHorizontalCompactionEnabled)).equalsIgnoreCase("true")) {
-      return true;
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * method to compact Delete Delta files in case of IUD Compaction.
-   *
-   * @param seg
-   * @param blockName
-   * @param absoluteTableIdentifier
-   * @param segmentUpdateDetails
-   * @param timestamp
-   * @return
-   * @throws IOException
-   */
-  public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String seg,
-      String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
-      SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
-
-    SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
-
-    List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
-
-    // set the update status.
-    segmentUpdateStatusManager.setUpdateStatusDetails(segmentUpdateDetails);
-
-    CarbonFile[] deleteDeltaFiles =
-        segmentUpdateStatusManager.getDeleteDeltaFilesList(seg, blockName);
-
-    String destFileName =
-        blockName + "-" + timestamp.toString() + CarbonCommonConstants.DELETE_DELTA_FILE_EXT;
-    String fullBlockFilePath = deleteDeltaFiles[0].getParentFile().getCanonicalPath()
-        + CarbonCommonConstants.FILE_SEPARATOR + destFileName;
-
-    List<String> deleteFilePathList = new ArrayList<String>();
-    for (CarbonFile cFile : deleteDeltaFiles) {
-      deleteFilePathList.add(cFile.getCanonicalPath());
-    }
-
-    CarbonDataMergerUtilResult blockDetails = new CarbonDataMergerUtilResult();
-    blockDetails.setBlockName(blockName);
-    blockDetails.setSegmentName(seg);
-    blockDetails.setDeleteDeltaStartTimestamp(timestamp.toString());
-    blockDetails.setDeleteDeltaEndTimestamp(timestamp.toString());
-
-    try {
-      if (startCompactionDeleteDeltaFiles(deleteFilePathList, blockName, fullBlockFilePath)) {
-        blockDetails.setCompactionStatus(true);
-      } else {
-        blockDetails.setCompactionStatus(false);
-      }
-      resultList.add(blockDetails);
-    } catch (IOException e) {
-      LOGGER.error("Compaction of Delete Delta Files failed. The complete file path is "
-          + fullBlockFilePath);
-      throw new IOException();
-    }
-    return resultList;
-  }
-
-  /**
-   * this method compact the delete delta files.
-   * @param deleteDeltaFiles
-   * @param blockName
-   * @param fullBlockFilePath
-   * @return
-   */
-  public static Boolean startCompactionDeleteDeltaFiles(List<String> deleteDeltaFiles,
-      String blockName, String fullBlockFilePath) throws IOException {
-
-    DeleteDeltaBlockDetails deleteDeltaBlockDetails = null;
-    CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader();
-    try {
-      deleteDeltaBlockDetails =
-              dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName);
-    } catch (Exception e) {
-      String blockFilePath = fullBlockFilePath
-              .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR));
-      LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath);
-      throw new IOException();
-    }
-    CarbonDeleteDeltaWriterImpl carbonDeleteWriter =
-            new CarbonDeleteDeltaWriterImpl(fullBlockFilePath,
-                    FileFactory.getFileType(fullBlockFilePath));
-    try {
-      carbonDeleteWriter.write(deleteDeltaBlockDetails);
-    } catch (IOException e) {
-      LOGGER.error("Error while writing compacted delete delta file " + fullBlockFilePath);
-      throw new IOException();
-    }
-    return true;
-  }
-
-  public static Boolean updateStatusFile(
-          List<CarbonDataMergerUtilResult> updateDataMergerDetailsList, CarbonTable table,
-          String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) {
-
-    List<SegmentUpdateDetails> segmentUpdateDetails =
-            new ArrayList<SegmentUpdateDetails>(updateDataMergerDetailsList.size());
-
-
-    // Check the list output.
-    for (CarbonDataMergerUtilResult carbonDataMergerUtilResult : updateDataMergerDetailsList) {
-      if (carbonDataMergerUtilResult.getCompactionStatus()) {
-        SegmentUpdateDetails tempSegmentUpdateDetails = new SegmentUpdateDetails();
-        tempSegmentUpdateDetails.setSegmentName(carbonDataMergerUtilResult.getSegmentName());
-        tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName());
-
-        for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager
-                .getUpdateStatusDetails()) {
-          if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName())
-                  && origDetails.getSegmentName()
-                  .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) {
-
-            tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock());
-            tempSegmentUpdateDetails.setStatus(origDetails.getStatus());
-            break;
-          }
-        }
-
-        tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp(
-                carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp());
-        tempSegmentUpdateDetails
-              .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp());
-
-        segmentUpdateDetails.add(tempSegmentUpdateDetails);
-      } else return false;
-    }
-
-    CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true);
-
-    // Update the Table Status.
-    String metaDataFilepath = table.getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier();
-
-    CarbonTablePath carbonTablePath = CarbonStorePath
-            .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                    absoluteTableIdentifier.getCarbonTableIdentifier());
-
-    String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-
-    boolean lockStatus = false;
-
-    try {
-      lockStatus = carbonLock.lockWithRetries();
-      if (lockStatus) {
-        LOGGER.info(
-                "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName()
-                        + " for table status updation");
-
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-                segmentStatusManager.readLoadMetadata(metaDataFilepath);
-
-        for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) {
-          if (loadMetadata.getLoadName().equalsIgnoreCase("0")) {
-            loadMetadata.setUpdateStatusFileName(
-                    CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
-          }
-        }
-        try {
-          segmentStatusManager
-                  .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
-        } catch (IOException e) {
-          return false;
-        }
-      } else {
-        LOGGER.error("Not able to acquire the lock for Table status updation for table " + table
-                .getDatabaseName() + "." + table.getFactTableName());
-      }
-    } finally {
-      if (lockStatus) {
-        if (carbonLock.unlock()) {
-          LOGGER.info(
-                 "Table unlocked successfully after table status updation" + table.getDatabaseName()
-                          + "." + table.getFactTableName());
-        } else {
-          LOGGER.error(
-                  "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table
-                          .getFactTableName() + " during table status updation");
-        }
-      }
-    }
-    return true;
-  }
-
-  /**
-   * This will update the property of segments as major compacted.
-   * @param model
-   * @param changedSegDetails
-   */
-  public static void updateMajorCompactionPropertyInSegment(CarbonLoadModel model,
-      List<LoadMetadataDetails> changedSegDetails,
-      List<LoadMetadataDetails> preservedSegment) throws Exception {
-
-    String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath();
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-            model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath);
-    List<LoadMetadataDetails> originalList = Arrays.asList(details);
-    for (LoadMetadataDetails segment : changedSegDetails) {
-      if (preservedSegment.contains(segment)) {
-        continue;
-      }
-      originalList.get(originalList.indexOf(segment)).setMajorCompacted("true");
-
-    }
-
-
-    ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj(
-            model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(),
-            LockUsage.TABLE_STATUS_LOCK);
-
-    try {
-      if (carbonTableStatusLock.lockWithRetries()) {
-        LOGGER.info(
-            "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName()
-                        + " for table status updation ");
-        CarbonTablePath carbonTablePath = CarbonStorePath
-                .getCarbonTablePath(absoluteTableIdentifier.getStorePath(),
-                        absoluteTableIdentifier.getCarbonTableIdentifier());
-
-        segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(),
-                originalList.toArray(new LoadMetadataDetails[originalList.size()]));
-      } else {
-        LOGGER.error(
-                "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model
-                        .getTableName() + "for table status updation");
-        throw new Exception("Failed to update the MajorCompactionStatus.");
-      }
-    } catch (IOException e) {
-      LOGGER.error("Error while writing metadata");
-      throw new Exception("Failed to update the MajorCompactionStatus." + e.getMessage());
-    } finally {
-      if (carbonTableStatusLock.unlock()) {
-        LOGGER.info(
-                "Table unlocked successfully after table status updation" + model.getDatabaseName()
-                        + "." + model.getTableName());
-      } else {
-        LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model
-                .getTableName() + " during table status updation");
-      }
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
deleted file mode 100644
index 214a231..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CarbonDataMergerUtilResult.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.mutate.SegmentUpdateDetails;
-
-public final class CarbonDataMergerUtilResult extends SegmentUpdateDetails {
-  private boolean compactionStatus;
-
-  public boolean getCompactionStatus() {
-    return compactionStatus;
-  }
-
-  public void setCompactionStatus(Boolean status) {
-    compactionStatus = status;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
deleted file mode 100644
index d1cb9d9..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionCallable.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.util.concurrent.Callable;
-
-import org.apache.carbondata.spark.rdd.Compactor;
-
-import org.apache.spark.sql.execution.command.CompactionCallableModel;
-
-/**
- * Callable class which is used to trigger the compaction in a separate callable.
- */
-public class CompactionCallable implements Callable<Void> {
-
-  private final CompactionCallableModel compactionCallableModel;
-
-  public CompactionCallable(CompactionCallableModel compactionCallableModel) {
-
-    this.compactionCallableModel = compactionCallableModel;
-  }
-
-  @Override public Void call() throws Exception {
-
-    Compactor.triggerCompaction(compactionCallableModel);
-    return null;
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
deleted file mode 100644
index 6cfe8b5..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionResultSortProcessor.java
+++ /dev/null
@@ -1,563 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.carbondata.common.CarbonIterator;
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.encoder.Encoding;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.scan.result.BatchResult;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.schema.metadata.SortObserver;
-import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortIntermediateFileMerger;
-import org.apache.carbondata.processing.sortandgroupby.sortdata.SortParameters;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.SingleThreadFinalSortFilesMerger;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
-
-import org.pentaho.di.core.exception.KettleException;
-
-/**
- * This class will process the query result and convert the data
- * into a format compatible for data load
- */
-public class CompactionResultSortProcessor {
-
-  /**
-   * LOGGER
-   */
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(CompactionResultSortProcessor.class.getName());
-  /**
-   * carbon load model that contains all the required information for load
-   */
-  private CarbonLoadModel carbonLoadModel;
-  /**
-   * sortDataRows instance for sorting each row read ad writing to sort temp file
-   */
-  private SortDataRows sortDataRows;
-  /**
-   * segment proeprties which contains required information for a segment
-   */
-  private SegmentProperties segmentProperties;
-  /**
-   * segment information of parent table
-   */
-  private SegmentProperties srcSegmentProperties;
-  /**
-   * final merger for merge sort
-   */
-  private SingleThreadFinalSortFilesMerger finalMerger;
-  /**
-   * data handler VO object
-   */
-  private CarbonFactDataHandlerColumnar dataHandler;
-  /**
-   * column cardinality
-   */
-  private int[] columnCardinality;
-  /**
-   * Fact Table To Index Table Column Mapping order
-   */
-  private int[] factToIndexColumnMapping;
-  /**
-   * Fact Table Dict Column to Index Table Dict Column Mapping
-   */
-  private int[] factToIndexDictColumnMapping;
-  /**
-   * boolean mapping for no dictionary columns in schema
-   */
-  private boolean[] noDictionaryColMapping;
-  /**
-   * agg type defined for measures
-   */
-  private char[] aggType;
-  /**
-   * segment id
-   */
-  private String segmentId;
-  /**
-   * index table name
-   */
-  private String indexTableName;
-  /**
-   * temp store location to be sued during data load
-   */
-  private String tempStoreLocation;
-  /**
-   * data base name
-   */
-  private String databaseName;
-  /**
-   * no dictionary column count in schema
-   */
-  private int noDictionaryCount;
-  /**
-   * implicit column count in schema
-   */
-  private int implicitColumnCount;
-  /**
-   * total count of measures in schema
-   */
-  private int measureCount;
-  /**
-   * dimension count excluding complex dimension and no dictionary column count
-   */
-  private int dimensionColumnCount;
-  /**
-   * complex dimension count in schema
-   */
-  private int complexDimensionCount;
-  /**
-   * carbon table
-   */
-  private CarbonTable carbonTable;
-  /**
-   * whether the allocated tasks has any record
-   */
-  private boolean isRecordFound;
-
-  /**
-   * @param carbonLoadModel
-   * @param columnCardinality
-   * @param segmentId
-   * @param indexTableName
-   */
-  public CompactionResultSortProcessor(CarbonLoadModel carbonLoadModel, int[] columnCardinality,
-      String segmentId, String indexTableName, int[] factToIndexColumnMapping,
-      int[] factToIndexDictColumnMapping) {
-    this.carbonLoadModel = carbonLoadModel;
-    this.columnCardinality = columnCardinality;
-    this.segmentId = segmentId;
-    this.indexTableName = indexTableName;
-    this.databaseName = carbonLoadModel.getDatabaseName();
-    this.factToIndexColumnMapping = factToIndexColumnMapping;
-    this.factToIndexDictColumnMapping = factToIndexDictColumnMapping;
-    initSegmentProperties();
-  }
-
-  /**
-   * This method will iterate over the query result and convert it into a format compatible
-   * for data loading
-   *
-   * @param detailQueryResultIteratorList
-   */
-  public void processQueryResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList)
-      throws Exception {
-    try {
-      initTempStoreLocation();
-      initSortDataRows();
-      processResult(detailQueryResultIteratorList);
-      // After delete command, if no records are fetched from one split,
-      // below steps are not required to be initialized.
-      if (isRecordFound) {
-        initializeFinalThreadMergerForMergeSort();
-        initDataHandler();
-        readAndLoadDataFromSortTempFiles();
-      }
-    } finally {
-      // clear temp files and folders created during secondary index creation
-      deleteTempStoreLocation();
-    }
-  }
-
-  /**
-   * This method will clean up the local folders and files created for secondary index creation
-   */
-  private void deleteTempStoreLocation() {
-    if (null != tempStoreLocation) {
-      try {
-        CarbonUtil.deleteFoldersAndFiles(new File[] { new File(tempStoreLocation) });
-      } catch (IOException | InterruptedException e) {
-        LOGGER.error(
-            "Problem deleting local folders during secondary index creation: " + e.getMessage());
-      }
-    }
-  }
-
-  /**
-   * This method will iterate over the query result and perform row sorting operation
-   *
-   * @param detailQueryResultIteratorList
-   */
-  private void processResult(List<CarbonIterator<BatchResult>> detailQueryResultIteratorList)
-      throws Exception {
-    for (CarbonIterator<BatchResult> detailQueryIterator : detailQueryResultIteratorList) {
-      while (detailQueryIterator.hasNext()) {
-        BatchResult batchResult = detailQueryIterator.next();
-        while (batchResult.hasNext()) {
-          addRowForSorting(prepareRowObjectForSorting(batchResult.next()));
-          isRecordFound = true;
-        }
-      }
-    }
-    try {
-      sortDataRows.startSorting();
-    } catch (CarbonSortKeyAndGroupByException e) {
-      LOGGER.error(e);
-      throw new Exception("Problem loading data while creating secondary index: " + e.getMessage());
-    }
-  }
-
-  /**
-   * This method will prepare the data from raw object that will take part in sorting
-   *
-   * @param row
-   * @return
-   */
-  private Object[] prepareRowObjectForSorting(Object[] row) {
-    ByteArrayWrapper wrapper = (ByteArrayWrapper) row[0];
-    // ByteBuffer[] noDictionaryBuffer = new ByteBuffer[noDictionaryCount];
-
-    List<CarbonDimension> dimensions = segmentProperties.getDimensions();
-    Object[] preparedRow = new Object[dimensions.size() + measureCount];
-
-    // convert the dictionary from MDKey to surrogate key
-    byte[] dictionaryKey = wrapper.getDictionaryKey();
-    long[] keyArray = srcSegmentProperties.getDimensionKeyGenerator().getKeyArray(dictionaryKey);
-    Object[] dictionaryValues = new Object[dimensionColumnCount + measureCount];
-    // Re-ordering is required as per index table column dictionary order,
-    // as output dictionary Byte Array is as per parent table schema order
-    for (int i = 0; i < keyArray.length; i++) {
-      dictionaryValues[factToIndexDictColumnMapping[i]] = Long.valueOf(keyArray[i]).intValue();
-    }
-
-    int noDictionaryIndex = 0;
-    int dictionaryIndex = 0;
-    int i = 0;
-    // loop excluding last dimension as last one is implicit column.
-    for (; i < dimensions.size() - 1; i++) {
-      CarbonDimension dims = dimensions.get(i);
-      if (dims.hasEncoding(Encoding.DICTIONARY)) {
-        // dictionary
-        preparedRow[i] = dictionaryValues[dictionaryIndex++];
-      } else {
-        // no dictionary dims
-        preparedRow[i] = wrapper.getNoDictionaryKeyByIndex(noDictionaryIndex++);
-      }
-    }
-
-    // at last add implicit column position reference(PID)
-
-    preparedRow[i] = wrapper.getImplicitColumnByteArray();
-    return preparedRow;
-  }
-
-  /**
-   * This method will read sort temp files, perform merge sort and add it to store for data loading
-   */
-  private void readAndLoadDataFromSortTempFiles() throws Exception {
-    try {
-      Object[] previousRow = null;
-      finalMerger.startFinalMerge();
-      while (finalMerger.hasNext()) {
-        Object[] rowRead = finalMerger.next();
-        // convert the row from surrogate key to MDKey
-        //        Object[] outputRow = CarbonDataProcessorUtil
-        //            .processNoKettle(rowRead, segmentProperties, aggType, measureCount, noDictionaryCount,
-        //                complexDimensionCount);
-        Object[] outputRow = null;
-        dataHandler.addDataToStore(outputRow);
-      }
-      dataHandler.finish();
-    } catch (CarbonDataWriterException e) {
-      LOGGER.error(e);
-      throw new Exception("Problem loading data during compaction: " + e.getMessage());
-    } catch (Exception e) {
-      LOGGER.error(e);
-      throw new Exception("Problem loading data during compaction: " + e.getMessage());
-    } finally {
-      if (null != dataHandler) {
-        try {
-          dataHandler.closeHandler();
-        } catch (CarbonDataWriterException e) {
-          LOGGER.error(e);
-          throw new Exception("Problem loading data during compaction: " + e.getMessage());
-        }
-      }
-    }
-  }
-
-  /**
-   * This method is used to process the row with out kettle.
-   *
-   * @param row               input row
-   * @param segmentProperties
-   * @param aggType
-   * @param measureCount
-   * @param noDictionaryCount
-   * @param complexDimCount
-   * @return
-   * @throws KettleException
-   */
-  public static Object[] processNoKettle(Object[] row, SegmentProperties segmentProperties,
-      char[] aggType, int measureCount, int noDictionaryCount, int complexDimCount)
-      throws KettleException {
-
-    //    int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
-    //
-    //    int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
-    //
-    //    int dimsArrayIndex = IgnoreDictionary.DIMENSION_INDEX_IN_ROW.getIndex();
-
-    int measureIndex = 0;
-
-    int noDimByteArrayIndex = 0;
-
-    int dimsArrayIndex = 0;
-
-    Object[] outputRow;
-    // adding one for the high cardinality dims byte array.
-    if (noDictionaryCount > 0 || complexDimCount > 0) {
-      outputRow = new Object[measureCount + 1 + 1];
-    } else {
-      outputRow = new Object[measureCount + 1];
-    }
-
-    int l = 0;
-    int index = 0;
-    Object[] measures = (Object[]) row[measureIndex];
-    for (int i = 0; i < measureCount; i++) {
-      outputRow[l++] = measures[index++];
-    }
-    outputRow[l] = row[noDimByteArrayIndex];
-
-    int[] highCardExcludedRows = new int[segmentProperties.getDimColumnsCardinality().length];
-    int[] dimsArray = (int[]) row[dimsArrayIndex];
-    for (int i = 0; i < highCardExcludedRows.length; i++) {
-      highCardExcludedRows[i] = dimsArray[i];
-    }
-    try {
-      outputRow[outputRow.length - 1] =
-          segmentProperties.getDimensionKeyGenerator().generateKey(highCardExcludedRows);
-    } catch (KeyGenException e) {
-      throw new KettleException("unable to generate the mdkey", e);
-    }
-    return outputRow;
-  }
-
-  /**
-   * initialise segment properties
-   */
-  private void initSegmentProperties() {
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName);
-    List<ColumnSchema> columnSchemaList = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(indexTableName),
-            carbonTable.getMeasureByTableName(indexTableName));
-    segmentProperties = new SegmentProperties(columnSchemaList, columnCardinality);
-    srcSegmentProperties =
-        new SegmentProperties(getParentColumnOrder(columnSchemaList), getParentOrderCardinality());
-  }
-
-  /**
-   * Convert index table column order into parent table column order
-   */
-  private List<ColumnSchema> getParentColumnOrder(List<ColumnSchema> columnSchemaList) {
-    List<ColumnSchema> parentColumnList = new ArrayList<ColumnSchema>(columnSchemaList.size());
-    for (int i = 0; i < columnSchemaList.size(); i++) {
-      // Extra cols are dummy_measure & positionId implicit column
-      if (i >= columnCardinality.length) {
-        parentColumnList.add(columnSchemaList.get(i));
-      } else {
-        parentColumnList.add(columnSchemaList.get(factToIndexColumnMapping[i]));
-      }
-    }
-    return parentColumnList;
-  }
-
-  /**
-   * Convert index table column cardinality order into parent table column order
-   */
-  private int[] getParentOrderCardinality() {
-    int[] parentColumnCardinality = new int[columnCardinality.length];
-    for (int i = 0; i < columnCardinality.length; i++) {
-      parentColumnCardinality[i] = columnCardinality[factToIndexColumnMapping[i]];
-    }
-    return parentColumnCardinality;
-  }
-
-  /**
-   * add row to a temp array which will we written to a sort temp file after sorting
-   *
-   * @param row
-   */
-  private void addRowForSorting(Object[] row) throws Exception {
-    try {
-      // prepare row array using RemoveDictionaryUtil class
-      sortDataRows.addRow(row);
-    } catch (CarbonSortKeyAndGroupByException e) {
-      LOGGER.error(e);
-      throw new Exception(
-          "Row addition for sorting failed while creating secondary index: " + e.getMessage());
-    }
-  }
-
-  /**
-   * create an instance of sort data rows
-   */
-  private void initSortDataRows() throws Exception {
-    CarbonTable indexTable = CarbonMetadata.getInstance().getCarbonTable(
-        carbonLoadModel.getDatabaseName() + CarbonCommonConstants.UNDERSCORE + indexTableName);
-    measureCount = indexTable.getMeasureByTableName(indexTableName).size();
-    implicitColumnCount = indexTable.getImplicitDimensionByTableName(indexTableName).size();
-    SortObserver observer = new SortObserver();
-    List<CarbonDimension> dimensions = indexTable.getDimensionByTableName(indexTableName);
-    noDictionaryColMapping = new boolean[dimensions.size()];
-    int i = 0;
-    for (CarbonDimension dimension : dimensions) {
-      if (CarbonUtil.hasEncoding(dimension.getEncoder(), Encoding.DICTIONARY)) {
-        i++;
-        continue;
-      }
-      noDictionaryColMapping[i++] = true;
-      noDictionaryCount++;
-    }
-    dimensionColumnCount = dimensions.size();
-    SortParameters parameters = createSortParameters();
-    SortIntermediateFileMerger intermediateFileMerger = new SortIntermediateFileMerger(parameters);
-    this.sortDataRows = new SortDataRows(parameters, intermediateFileMerger);
-    try {
-      this.sortDataRows.initialize();
-    } catch (CarbonSortKeyAndGroupByException e) {
-      LOGGER.error(e);
-      throw new Exception(
-          "Error initializing sort data rows object while creating secondary index: " + e
-              .getMessage());
-    }
-  }
-
-  /**
-   * This method will create the sort parameters VO object
-   *
-   * @return
-   */
-  private SortParameters createSortParameters() {
-    boolean useKettle = false;
-    SortParameters parameters = SortParameters
-        .createSortParameters(databaseName, indexTableName, dimensionColumnCount,
-            complexDimensionCount, measureCount, noDictionaryCount,
-            carbonLoadModel.getPartitionId(), segmentId, carbonLoadModel.getTaskNo(),
-            noDictionaryColMapping);
-    return parameters;
-  }
-
-  /**
-   * create an instance of finalThread merger which will perform merge sort on all the
-   * sort temp files
-   */
-  private void initializeFinalThreadMergerForMergeSort() {
-    String sortTempFileLocation = tempStoreLocation + CarbonCommonConstants.FILE_SEPARATOR
-        + CarbonCommonConstants.SORT_TEMP_FILE_LOCATION;
-    initAggType();
-    // kettle will not be used
-    boolean useKettle = false;
-    finalMerger = new SingleThreadFinalSortFilesMerger(sortTempFileLocation, indexTableName,
-        dimensionColumnCount, complexDimensionCount, measureCount, noDictionaryCount, aggType,
-        noDictionaryColMapping, useKettle);
-  }
-
-  /**
-   * initialise carbon data writer instance
-   */
-  private void initDataHandler() throws Exception {
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel = getCarbonFactDataHandlerModel();
-    carbonFactDataHandlerModel.setPrimitiveDimLens(segmentProperties.getDimColumnsCardinality());
-    CarbonDataFileAttributes carbonDataFileAttributes =
-        new CarbonDataFileAttributes(Integer.parseInt(carbonLoadModel.getTaskNo()),
-            carbonLoadModel.getFactTimeStamp());
-    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    if (segmentProperties.getNumberOfNoDictionaryDimension() > 0
-        || segmentProperties.getComplexDimensions().size() > 0) {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
-    } else {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
-    }
-    carbonFactDataHandlerModel.setColCardinality(columnCardinality);
-    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    // NO-Kettle.
-    carbonFactDataHandlerModel.setUseKettle(false);
-    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-    try {
-      dataHandler.initialise();
-    } catch (CarbonDataWriterException e) {
-      LOGGER.error(e);
-      throw new Exception(
-          "Problem initialising data handler while creating secondary index: " + e.getMessage());
-    }
-  }
-
-  /**
-   * This method will create a model object for carbon fact data handler
-   *
-   * @return
-   */
-  private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel() {
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel = null;
-    //    CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonLoaderUtil
-    //        .getCarbonFactDataHandlerModel(carbonLoadModel, segmentProperties, databaseName,
-    //            indexTableName, tempStoreLocation, carbonLoadModel.getStorePath());
-    return carbonFactDataHandlerModel;
-  }
-
-  /**
-   * initialise temporary store location
-   */
-  private void initTempStoreLocation() {
-    tempStoreLocation = CarbonDataProcessorUtil
-        .getLocalDataFolderLocation(databaseName, indexTableName, carbonLoadModel.getTaskNo(),
-            carbonLoadModel.getPartitionId(), segmentId, false);
-  }
-
-  /**
-   * initialise aggregation type for measures for their storage format
-   */
-  private void initAggType() {
-    aggType = new char[measureCount];
-    Arrays.fill(aggType, 'n');
-    carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + indexTableName);
-    List<CarbonMeasure> measures = carbonTable.getMeasureByTableName(indexTableName);
-    for (int i = 0; i < measureCount; i++) {
-      aggType[i] = DataTypeUtil.getAggType(measures.get(i).getDataType());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
deleted file mode 100644
index c6b8dda..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/CompactionType.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-/**
- * This enum is used to define the types of Compaction.
- * We have 3 types. one is Minor another is Major and
- * finally a compaction done after UPDATE-DELETE operation
- * called IUD compaction.
- */
-public enum CompactionType {
-    MINOR_COMPACTION,
-    MAJOR_COMPACTION,
-    IUD_UPDDEL_DELTA_COMPACTION,
-    IUD_DELETE_DELTA_COMPACTION,
-    NONE
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
deleted file mode 100644
index 8647526..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeBlockRelation.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-/**
- * Block to Node mapping
- */
-public class NodeBlockRelation implements Comparable<NodeBlockRelation> {
-
-  private final Distributable block;
-  private final String node;
-
-  public NodeBlockRelation(Distributable block, String node) {
-    this.block = block;
-    this.node = node;
-
-  }
-
-  public Distributable getBlock() {
-    return block;
-  }
-
-  public String getNode() {
-    return node;
-  }
-
-  @Override public int compareTo(NodeBlockRelation obj) {
-    return this.getNode().compareTo(obj.getNode());
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof NodeBlockRelation)) {
-      return false;
-    }
-    NodeBlockRelation o = (NodeBlockRelation) obj;
-    return node.equals(o.node);
-  }
-
-  @Override public int hashCode() {
-    return node.hashCode();
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
deleted file mode 100644
index 1bf69af..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/NodeMultiBlockRelation.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.util.List;
-
-import org.apache.carbondata.core.datastore.block.Distributable;
-
-public class NodeMultiBlockRelation implements Comparable<NodeMultiBlockRelation> {
-
-  private final List<Distributable> blocks;
-  private final String node;
-
-  public NodeMultiBlockRelation(String node, List<Distributable> blocks) {
-    this.node = node;
-    this.blocks = blocks;
-
-  }
-
-  public List<Distributable> getBlocks() {
-    return blocks;
-  }
-
-  public String getNode() {
-    return node;
-  }
-
-  @Override public int compareTo(NodeMultiBlockRelation obj) {
-    return this.blocks.size() - obj.getBlocks().size();
-  }
-
-  @Override public boolean equals(Object obj) {
-    if (!(obj instanceof NodeMultiBlockRelation)) {
-      return false;
-    }
-    NodeMultiBlockRelation o = (NodeMultiBlockRelation) obj;
-    return blocks.equals(o.blocks) && node.equals(o.node);
-  }
-
-  @Override public int hashCode() {
-    return blocks.hashCode() + node.hashCode();
-  }
-}