You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/04/06 10:49:45 UTC

[3/7] incubator-carbondata git commit: Compaction lock should also be acquired during alter operation as alter and compaction on same table should not be allowed concurrently.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
deleted file mode 100644
index 29aa7e7..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/RowResultMerger.java
+++ /dev/null
@@ -1,367 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import java.io.File;
-import java.util.AbstractQueue;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.PriorityQueue;
-
-import org.apache.carbondata.common.logging.LogService;
-import org.apache.carbondata.common.logging.LogServiceFactory;
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.keygenerator.KeyGenException;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
-import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
-import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
-import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
-import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-import org.apache.carbondata.core.util.ByteUtil;
-import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.core.util.DataTypeUtil;
-import org.apache.carbondata.core.util.path.CarbonStorePath;
-import org.apache.carbondata.core.util.path.CarbonTablePath;
-import org.apache.carbondata.processing.datatypes.GenericDataType;
-import org.apache.carbondata.processing.model.CarbonLoadModel;
-import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerColumnar;
-import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
-import org.apache.carbondata.processing.store.CarbonFactHandler;
-import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException;
-import org.apache.carbondata.spark.merger.exeception.SliceMergerException;
-
-/**
- * This is the Merger class responsible for the merging of the segments.
- */
-public class RowResultMerger {
-
-  private final String databaseName;
-  private final String tableName;
-  private final String tempStoreLocation;
-  private final String factStoreLocation;
-  private CarbonFactHandler dataHandler;
-  private List<RawResultIterator> rawResultIteratorList =
-      new ArrayList<RawResultIterator>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
-  private SegmentProperties segprop;
-  /**
-   * record holder heap
-   */
-  private AbstractQueue<RawResultIterator> recordHolderHeap;
-
-  private TupleConversionAdapter tupleConvertor;
-
-  private static final LogService LOGGER =
-      LogServiceFactory.getLogService(RowResultMerger.class.getName());
-
-  public RowResultMerger(List<RawResultIterator> iteratorList, String databaseName,
-      String tableName, SegmentProperties segProp, String tempStoreLocation,
-      CarbonLoadModel loadModel, CompactionType compactionType) {
-
-    CarbonDataFileAttributes carbonDataFileAttributes;
-
-    this.rawResultIteratorList = iteratorList;
-    // create the List of RawResultIterator.
-
-    recordHolderHeap = new PriorityQueue<RawResultIterator>(rawResultIteratorList.size(),
-        new RowResultMerger.CarbonMdkeyComparator());
-
-    this.segprop = segProp;
-    this.tempStoreLocation = tempStoreLocation;
-
-    this.factStoreLocation = loadModel.getStorePath();
-
-    if (!new File(tempStoreLocation).mkdirs()) {
-      LOGGER.error("Error while new File(tempStoreLocation).mkdirs() ");
-    }
-
-    this.databaseName = databaseName;
-    this.tableName = tableName;
-
-    int measureCount = segprop.getMeasures().size();
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-            .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel =
-        getCarbonFactDataHandlerModel(loadModel);
-    carbonFactDataHandlerModel.setPrimitiveDimLens(segprop.getDimColumnsCardinality());
-
-    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
-      int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
-          CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
-              carbonTable.getCarbonTableIdentifier()));
-
-      // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
-      // be written in same segment. So the TaskNo should be incremented by 1 from max val.
-      int index = taskNo + 1;
-      carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
-    }
-    else {
-      carbonDataFileAttributes =
-          new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
-              loadModel.getFactTimeStamp());
-    }
-
-    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
-    if (segProp.getNumberOfNoDictionaryDimension() > 0
-        || segProp.getComplexDimensions().size() > 0) {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount + 1);
-    } else {
-      carbonFactDataHandlerModel.setMdKeyIndex(measureCount);
-    }
-    carbonFactDataHandlerModel.setBlockSizeInMB(carbonTable.getBlockSizeInMB());
-    dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel);
-
-    tupleConvertor = new TupleConversionAdapter(segProp);
-  }
-
-  /**
-   * Merge function
-   *
-   */
-  public boolean mergerSlice() {
-    boolean mergeStatus = false;
-    int index = 0;
-    boolean isDataPresent = false;
-    try {
-
-      // add all iterators to the queue
-      for (RawResultIterator leaftTupleIterator : this.rawResultIteratorList) {
-        this.recordHolderHeap.add(leaftTupleIterator);
-        index++;
-      }
-      RawResultIterator iterator = null;
-      while (index > 1) {
-        // iterator the top record
-        iterator = this.recordHolderHeap.poll();
-        Object[] convertedRow = iterator.next();
-        if (null == convertedRow) {
-          index--;
-          continue;
-        }
-        if (!isDataPresent) {
-          dataHandler.initialise();
-          isDataPresent = true;
-        }
-        // get the mdkey
-        addRow(convertedRow);
-        // if there is no record in the leaf and all then decrement the
-        // index
-        if (!iterator.hasNext()) {
-          index--;
-          continue;
-        }
-        // add record to heap
-        this.recordHolderHeap.add(iterator);
-      }
-      // if record holder is not empty then iterator the slice holder from
-      // heap
-      iterator = this.recordHolderHeap.poll();
-      while (true) {
-        Object[] convertedRow = iterator.next();
-        if (null == convertedRow) {
-          break;
-        }
-        // do it only once
-        if (!isDataPresent) {
-          dataHandler.initialise();
-          isDataPresent = true;
-        }
-        addRow(convertedRow);
-        // check if leaf contains no record
-        if (!iterator.hasNext()) {
-          break;
-        }
-      }
-      if (isDataPresent)
-      {
-        this.dataHandler.finish();
-      }
-      mergeStatus = true;
-    } catch (Exception e) {
-      LOGGER.error(e, e.getMessage());
-      LOGGER.error("Exception in compaction merger " + e.getMessage());
-      mergeStatus = false;
-    } finally {
-      try {
-        if (isDataPresent) {
-          this.dataHandler.closeHandler();
-        }
-      } catch (CarbonDataWriterException e) {
-        LOGGER.error("Exception while closing the handler in compaction merger " + e.getMessage());
-        mergeStatus = false;
-      }
-    }
-
-    return mergeStatus;
-  }
-
-  /**
-   * Below method will be used to add sorted row
-   *
-   * @throws SliceMergerException
-   */
-  private void addRow(Object[] carbonTuple) throws SliceMergerException {
-    Object[] rowInWritableFormat;
-
-    rowInWritableFormat = tupleConvertor.getObjectArray(carbonTuple);
-    try {
-      this.dataHandler.addDataToStore(rowInWritableFormat);
-    } catch (CarbonDataWriterException e) {
-      throw new SliceMergerException("Problem in merging the slice", e);
-    }
-  }
-
-  /**
-   * This method will create a model object for carbon fact data handler
-   *
-   * @param loadModel
-   * @return
-   */
-  private CarbonFactDataHandlerModel getCarbonFactDataHandlerModel(CarbonLoadModel loadModel) {
-    CarbonFactDataHandlerModel carbonFactDataHandlerModel = new CarbonFactDataHandlerModel();
-    carbonFactDataHandlerModel.setDatabaseName(databaseName);
-    carbonFactDataHandlerModel.setTableName(tableName);
-    carbonFactDataHandlerModel.setMeasureCount(segprop.getMeasures().size());
-    carbonFactDataHandlerModel.setCompactionFlow(true);
-    carbonFactDataHandlerModel
-        .setMdKeyLength(segprop.getDimensionKeyGenerator().getKeySizeInBytes());
-    carbonFactDataHandlerModel.setStoreLocation(tempStoreLocation);
-    carbonFactDataHandlerModel.setDimLens(segprop.getDimColumnsCardinality());
-    carbonFactDataHandlerModel.setSegmentProperties(segprop);
-    carbonFactDataHandlerModel.setNoDictionaryCount(segprop.getNumberOfNoDictionaryDimension());
-    carbonFactDataHandlerModel.setDimensionCount(
-        segprop.getDimensions().size() - carbonFactDataHandlerModel.getNoDictionaryCount());
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    List<ColumnSchema> wrapperColumnSchema = CarbonUtil
-        .getColumnSchemaList(carbonTable.getDimensionByTableName(tableName),
-            carbonTable.getMeasureByTableName(tableName));
-    carbonFactDataHandlerModel.setWrapperColumnSchema(wrapperColumnSchema);
-    // get the cardinality for all all the columns including no dictionary columns
-    int[] formattedCardinality =
-        CarbonUtil.getFormattedCardinality(segprop.getDimColumnsCardinality(), wrapperColumnSchema);
-    carbonFactDataHandlerModel.setColCardinality(formattedCardinality);
-    //TO-DO Need to handle complex types here .
-    Map<Integer, GenericDataType> complexIndexMap =
-        new HashMap<Integer, GenericDataType>(segprop.getComplexDimensions().size());
-    carbonFactDataHandlerModel.setComplexIndexMap(complexIndexMap);
-    carbonFactDataHandlerModel.setDataWritingRequest(true);
-
-    char[] aggType = new char[segprop.getMeasures().size()];
-    Arrays.fill(aggType, 'n');
-    int i = 0;
-    for (CarbonMeasure msr : segprop.getMeasures()) {
-      aggType[i++] = DataTypeUtil.getAggType(msr.getDataType());
-    }
-    carbonFactDataHandlerModel.setAggType(aggType);
-    carbonFactDataHandlerModel.setFactDimLens(segprop.getDimColumnsCardinality());
-
-    String carbonDataDirectoryPath =
-        checkAndCreateCarbonStoreLocation(this.factStoreLocation, databaseName, tableName,
-            loadModel.getPartitionId(), loadModel.getSegmentId());
-    carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath);
-
-    List<CarbonDimension> dimensionByTableName =
-        loadModel.getCarbonDataLoadSchema().getCarbonTable().getDimensionByTableName(tableName);
-    boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()];
-    int index = 0;
-    for (CarbonDimension dimension : dimensionByTableName) {
-      isUseInvertedIndexes[index++] = dimension.isUseInvertedIndex();
-    }
-    carbonFactDataHandlerModel.setIsUseInvertedIndex(isUseInvertedIndexes);
-    return carbonFactDataHandlerModel;
-  }
-
-  /**
-   * This method will get the store location for the given path, segment id and partition id
-   *
-   * @return data directory path
-   */
-  private String checkAndCreateCarbonStoreLocation(String factStoreLocation, String databaseName,
-      String tableName, String partitionId, String segmentId) {
-    CarbonTable carbonTable = CarbonMetadata.getInstance()
-        .getCarbonTable(databaseName + CarbonCommonConstants.UNDERSCORE + tableName);
-    CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier();
-    CarbonTablePath carbonTablePath =
-        CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier);
-    String carbonDataDirectoryPath =
-        carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId);
-    CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath);
-    return carbonDataDirectoryPath;
-  }
-
-  /**
-   * Comparator class for comparing 2 raw row result.
-   */
-  private class CarbonMdkeyComparator implements Comparator<RawResultIterator> {
-
-    @Override public int compare(RawResultIterator o1, RawResultIterator o2) {
-
-      Object[] row1 = new Object[0];
-      Object[] row2 = new Object[0];
-      try {
-        row1 = o1.fetchConverted();
-        row2 = o2.fetchConverted();
-      } catch (KeyGenException e) {
-        LOGGER.error(e.getMessage());
-      }
-      if (null == row1 || null == row2) {
-        return 0;
-      }
-      ByteArrayWrapper key1 = (ByteArrayWrapper) row1[0];
-      ByteArrayWrapper key2 = (ByteArrayWrapper) row2[0];
-      int compareResult = 0;
-      int[] columnValueSizes = segprop.getEachDimColumnValueSize();
-      int dictionaryKeyOffset = 0;
-      byte[] dimCols1 = key1.getDictionaryKey();
-      byte[] dimCols2 = key2.getDictionaryKey();
-      int noDicIndex = 0;
-      for (int eachColumnValueSize : columnValueSizes) {
-        // case of dictionary cols
-        if (eachColumnValueSize > 0) {
-
-          compareResult = ByteUtil.UnsafeComparer.INSTANCE
-              .compareTo(dimCols1, dictionaryKeyOffset, eachColumnValueSize, dimCols2,
-                  dictionaryKeyOffset, eachColumnValueSize);
-          dictionaryKeyOffset += eachColumnValueSize;
-        } else { // case of no dictionary
-
-          byte[] noDictionaryDim1 = key1.getNoDictionaryKeyByIndex(noDicIndex);
-          byte[] noDictionaryDim2 = key2.getNoDictionaryKeyByIndex(noDicIndex);
-          compareResult =
-              ByteUtil.UnsafeComparer.INSTANCE.compareTo(noDictionaryDim1, noDictionaryDim2);
-          noDicIndex++;
-
-        }
-        if (0 != compareResult) {
-          return compareResult;
-        }
-      }
-      return 0;
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
deleted file mode 100644
index d1a3a8d..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TableMeta.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger;
-
-import java.io.Serializable;
-
-import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
-
-public class TableMeta implements Serializable {
-
-  private static final long serialVersionUID = -1749874611119829431L;
-
-  public CarbonTableIdentifier carbonTableIdentifier;
-  public String storePath;
-  public CarbonTable carbonTable;
-
-  public TableMeta(CarbonTableIdentifier carbonTableIdentifier, String storePath,
-      CarbonTable carbonTable) {
-    this.carbonTableIdentifier = carbonTableIdentifier;
-    this.storePath = storePath;
-    this.carbonTable = carbonTable;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
deleted file mode 100644
index 08b563f..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/TupleConversionAdapter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.carbondata.spark.merger;
-
-import org.apache.carbondata.core.datastore.block.SegmentProperties;
-import org.apache.carbondata.core.scan.wrappers.ByteArrayWrapper;
-
-/**
- * This class will be used to convert the Result into the format used in data writer.
- */
-class TupleConversionAdapter {
-
-  private final SegmentProperties segmentproperties;
-
-  private int noDictionaryPresentIndex;
-
-  private int measureCount;
-
-  private boolean isNoDictionaryPresent;
-
-  public TupleConversionAdapter(SegmentProperties segmentProperties) {
-    this.measureCount = segmentProperties.getMeasures().size();
-    this.isNoDictionaryPresent = segmentProperties.getNumberOfNoDictionaryDimension() > 0;
-    if (isNoDictionaryPresent) {
-      noDictionaryPresentIndex++;
-    }
-    this.segmentproperties = segmentProperties;
-  }
-
-  /**
-   * Converting the raw result to the format understandable by the data writer.
-   * @param carbonTuple
-   * @return
-   */
-  public Object[] getObjectArray(Object[] carbonTuple) {
-    Object[] row = new Object[measureCount + noDictionaryPresentIndex + 1];
-    int index = 0;
-    // put measures.
-
-    for (int j = 1; j <= measureCount; j++) {
-      row[index++] = carbonTuple[j];
-    }
-
-    // put No dictionary byte []
-    if (isNoDictionaryPresent) {
-      row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getNoDictionaryKeys();
-    }
-
-    // put No Dictionary Dims
-    row[index++] = ((ByteArrayWrapper) carbonTuple[0]).getDictionaryKey();
-    return row;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
deleted file mode 100644
index fd6610c..0000000
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/merger/exeception/SliceMergerException.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.merger.exeception;
-
-import java.util.Locale;
-
-public class SliceMergerException extends Exception {
-
-  /**
-   * default serial version ID.
-   */
-  private static final long serialVersionUID = 1L;
-
-  /**
-   * The Error message.
-   */
-  private String msg = "";
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceMergerException(String msg) {
-    super(msg);
-    this.msg = msg;
-  }
-
-  /**
-   * Constructor
-   *
-   * @param msg The error message for this exception.
-   */
-  public SliceMergerException(String msg, Throwable t) {
-    super(msg, t);
-    this.msg = msg;
-  }
-
-  /**
-   * This method is used to get the localized message.
-   *
-   * @param locale - A Locale object represents a specific geographical,
-   *               political, or cultural region.
-   * @return - Localized error message.
-   */
-  public String getLocalizedMessage(Locale locale) {
-    return "";
-  }
-
-  /**
-   * getLocalizedMessage
-   */
-  @Override public String getLocalizedMessage() {
-    return super.getLocalizedMessage();
-  }
-
-  /**
-   * getMessage
-   */
-  public String getMessage() {
-    return this.msg;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
index f04669c..277005b 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonIUDMergerRDD.scala
@@ -31,9 +31,9 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil
+import org.apache.carbondata.processing.merger.CarbonDataMergerUtil
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.MergeResult
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil
 
 /**
  * IUD carbon merger RDD

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
index 51f9022..350a2ec 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala
@@ -46,11 +46,11 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonMultiBlockSplit}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, CarbonInputSplitTaskInfo}
+import org.apache.carbondata.processing.merger._
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil
 import org.apache.carbondata.spark.MergeResult
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger._
 import org.apache.carbondata.spark.splits.TableSplit
 
 class CarbonMergerRDD[K, V](
@@ -152,9 +152,14 @@ class CarbonMergerRDD[K, V](
           CarbonCompactionUtil.createDataFileFooterMappingForSegments(tableBlockInfoList)
 
         carbonLoadModel.setStorePath(hdfsStoreLocation)
-
+        // check for restructured block
+        // TODO: only in case of add and drop this variable should be true
+        val restructuredBlockExists: Boolean = CarbonCompactionUtil
+          .checkIfAnyRestructuredBlockExists(segmentMapping,
+            dataFileMetadataSegMapping,
+            carbonTable.getTableLastUpdatedTime)
         exec = new CarbonCompactionExecutor(segmentMapping, segmentProperties,
-          carbonTable, dataFileMetadataSegMapping)
+          carbonTable, dataFileMetadataSegMapping, restructuredBlockExists)
 
         // fire a query and get the results.
         var result2: java.util.List[RawResultIterator] = null
@@ -190,17 +195,25 @@ class CarbonMergerRDD[K, V](
 
         carbonLoadModel.setSegmentId(mergeNumber)
         carbonLoadModel.setPartitionId("0")
-        val merger =
-          new RowResultMerger(result2,
-            databaseName,
-            factTableName,
+        var processor: AbstractResultProcessor = null
+        if (restructuredBlockExists) {
+          processor = new CompactionResultSortProcessor(carbonLoadModel, carbonTable,
             segmentProperties,
-            tempStoreLoc,
-            carbonLoadModel,
-            carbonMergerMapping.campactionType
+            carbonMergerMapping.campactionType,
+            factTableName
           )
-        mergeStatus = merger.mergerSlice()
-
+        } else {
+          processor =
+            new RowResultMergerProcessor(
+              databaseName,
+              factTableName,
+              segmentProperties,
+              tempStoreLoc,
+              carbonLoadModel,
+              carbonMergerMapping.campactionType
+            )
+        }
+        mergeStatus = processor.execute(result2)
         mergeResult = tableBlockInfoList.get(0).getSegmentId + ',' + mergeNumber
 
       } catch {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index 3b38028..1a237f6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -24,9 +24,8 @@ import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCa
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.load.CarbonLoaderUtil
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionType}
 
 /**
  * Compactor class which handled the compaction cases.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index 0ba99a8..d6cc2e6 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -33,10 +33,11 @@ import org.apache.carbondata.core.metadata.{CarbonMetadata, CarbonTableIdentifie
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.{CarbonDataLoadSchema, CarbonLoadModel}
 import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.compaction.CompactionCallable
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CompactionCallable, CompactionType}
 import org.apache.carbondata.spark.util.{CommonUtil, LoadMetadataUtil}
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index dadd03e..367bf46 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -39,10 +39,10 @@ import org.apache.carbondata.core.service.CarbonCommonFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager}
 import org.apache.carbondata.core.util.DataTypeUtil
 import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.merger.CompactionType
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.CarbonSparkFactory
 import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.merger.CompactionType
 import org.apache.carbondata.spark.rdd.AlterTableAddColumnRDD
 import org.apache.carbondata.spark.util.{DataTypeConverterUtil, GlobalDictionaryUtil}
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 60742ac..4cca0a3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.BlockDetails
 import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index e322fc8..0e6153f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -40,8 +40,8 @@ import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.hadoop.{CarbonInputFormat, CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.util.{CarbonInputFormatUtil, SchemaReader}
+import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonFilters, CarbonOption}
-import org.apache.carbondata.spark.merger.TableMeta
 import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
 
 private[sql] case class CarbonDatasourceHadoopRelation(

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
index 4c5e733..2ff21c8e 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceRelation.scala
@@ -34,8 +34,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.{CarbonColumn, Ca
 import org.apache.carbondata.core.metadata.schema.table.column.CarbonImplicitDimension
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.{CarbonOption, _}
-import org.apache.carbondata.spark.merger.TableMeta
 
 /**
  * Carbon relation provider compliant to data source api.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
index 62e5241..a439c30 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/IUDCommands.scala
@@ -43,10 +43,9 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl
 import org.apache.carbondata.processing.exception.MultipleMatchingException
+import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
 import org.apache.carbondata.spark.DeleteDelataResultImpl
 import org.apache.carbondata.spark.load.FailureCauses
-import org.apache.carbondata.spark.merger.{CarbonDataMergerUtil, CarbonDataMergerUtilResult, CompactionType}
-import org.apache.carbondata.spark.merger.CarbonDataMergerUtil._
 import org.apache.carbondata.spark.util.QueryPlanUtil
 
 
@@ -272,7 +271,7 @@ object IUDCommon {
       carbonRelation: CarbonRelation,
       isUpdateOperation: Boolean): Unit = {
 
-    var ishorizontalCompaction = isHorizontalCompactionEnabled()
+    var ishorizontalCompaction = CarbonDataMergerUtil.isHorizontalCompactionEnabled()
 
     if (ishorizontalCompaction == false) {
       return
@@ -288,7 +287,7 @@ object IUDCommon {
     val deleteTimeStamp = updateTimeStamp + 1
 
     // get the valid segments
-    var segLists = getValidSegmentList(absTableIdentifier)
+    var segLists = CarbonDataMergerUtil.getValidSegmentList(absTableIdentifier)
 
     if (segLists == null || segLists.size() == 0) {
       return
@@ -350,7 +349,7 @@ object IUDCommon {
     val db = carbonTable.getDatabaseName
     val table = carbonTable.getFactTableName
     // get the valid segments qualified for update compaction.
-    val validSegList = getSegListIUDCompactionQualified(segLists,
+    val validSegList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
@@ -406,7 +405,7 @@ object IUDCommon {
 
     val db = carbonTable.getDatabaseName
     val table = carbonTable.getFactTableName
-    val deletedBlocksList = getSegListIUDCompactionQualified(segLists,
+    val deletedBlocksList = CarbonDataMergerUtil.getSegListIUDCompactionQualified(segLists,
       absTableIdentifier,
       segmentUpdateStatusManager,
       compactionTypeIUD)
@@ -436,7 +435,7 @@ object IUDCommon {
             val blockName = segmentAndBlocks
               .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
 
-            val result = compactBlockDeleteDeltaFiles(segment, blockName,
+            val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
               absTableIdentifier,
               updateStatusDetails,
               timestamp)
@@ -453,7 +452,7 @@ object IUDCommon {
         })
       })
 
-      val updateStatus = updateStatusFile(resultList.toList.asJava,
+      val updateStatus = CarbonDataMergerUtil.updateStatusFile(resultList.toList.asJava,
         carbonTable,
         timestamp.toString,
         segmentUpdateStatusManager)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index d5cc540..e8d3907 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -47,7 +47,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.processing.merger.TableMeta
 
 case class MetaData(var tablesMeta: ArrayBuffer[TableMeta])
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 7cb5ed4..4f33043 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -50,11 +50,11 @@ import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonStorePath
 import org.apache.carbondata.processing.csvload.BlockDetails
 import org.apache.carbondata.processing.etl.DataLoadingException
+import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
-import org.apache.carbondata.spark.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.splits.TableSplit
 import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 4169ac3..b9e8682 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -32,8 +32,8 @@ import org.apache.carbondata.core.scan.expression.Expression
 import org.apache.carbondata.core.scan.expression.logical.AndExpression
 import org.apache.carbondata.hadoop.CarbonProjection
 import org.apache.carbondata.hadoop.util.SchemaReader
+import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.CarbonFilters
-import org.apache.carbondata.spark.merger.TableMeta
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
index 88ca4af..38fdb11 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/AlterTableCommands.scala
@@ -50,8 +50,9 @@ private[sql] case class AlterTableAddColumns(
     val dbName = alterTableAddColumnsModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table add columns request has been received for $dbName.$tableName")
-    val carbonLock = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    val locks = AlterTableUtil
+      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
     // get the latest carbon table and check for column existence
     val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
     var newCols = Seq[org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema]()
@@ -102,13 +103,7 @@ private[sql] case class AlterTableAddColumns(
         sys.error("Alter table add column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          LOGGER.info("Alter table add columns lock released successfully")
-        } else {
-          LOGGER.error("Unable to release lock during alter table add columns operation")
-        }
-      }
+      AlterTableUtil.releaseLocks(locks, LOGGER)
     }
     Seq.empty
   }
@@ -147,15 +142,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
                    s"Table $oldDatabaseName.$oldTableName does not exist")
       sys.error(s"Table $oldDatabaseName.$oldTableName does not exist")
     }
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK,
+      LockUsage.COMPACTION_LOCK,
+      LockUsage.DELETE_SEGMENT_LOCK,
+      LockUsage.CLEAN_FILES_LOCK,
+      LockUsage.DROP_TABLE_LOCK)
+    val locks = AlterTableUtil
+      .validateTableAndAcquireLock(oldDatabaseName, oldTableName, locksToBeAcquired, LOGGER)(
+        sparkSession)
     val carbonTable = relation.tableMeta.carbonTable
-    val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
-    if (carbonLock.lockWithRetries()) {
-      LOGGER.info("Successfully able to get the table metadata file lock")
-    } else {
-      sys.error("Table is locked for updation. Please try after some time")
-    }
     try {
       // get the latest carbon table and check for column existence
       val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getStorePath,
@@ -200,24 +195,15 @@ private[sql] case class AlterTableRenameTable(alterTableRenameModel: AlterTableR
         sys.error("Alter table rename table operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          LOGGER.info("Lock released successfully")
-        } else {
-          val storeLocation = CarbonProperties.getInstance
-            .getProperty(CarbonCommonConstants.STORE_LOCATION,
-              System.getProperty("java.io.tmpdir"))
-          val lockFilePath = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
-                             oldDatabaseName + CarbonCommonConstants.FILE_SEPARATOR + newTableName +
-                             CarbonCommonConstants.FILE_SEPARATOR +
-                             LockUsage.METADATA_LOCK
-          if(carbonLock.releaseLockManually(lockFilePath)) {
-            LOGGER.info("Lock released successfully")
-          } else {
-            LOGGER.error("Unable to release lock during rename table")
-          }
-        }
-      }
+      AlterTableUtil.releaseLocks(locks, LOGGER)
+      // case specific to rename table as after table rename old table path will not be found
+      AlterTableUtil
+        .releaseLocksManually(locks,
+          locksToBeAcquired,
+          oldDatabaseName,
+          newTableName,
+          carbonTable.getStorePath,
+          LOGGER)
     }
     Seq.empty
   }
@@ -251,8 +237,9 @@ private[sql] case class AlterTableDropColumns(
     val dbName = alterTableDropColumnModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table drop columns request has been received for $dbName.$tableName")
-    val carbonLock = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    val locks = AlterTableUtil
+      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
     try {
       // get the latest carbon table and check for column existence
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
@@ -333,13 +320,7 @@ private[sql] case class AlterTableDropColumns(
         sys.error("Alter table drop column operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          LOGGER.info("Alter table drop columns lock released successfully")
-        } else {
-          LOGGER.error("Unable to release lock during alter table drop columns operation")
-        }
-      }
+      AlterTableUtil.releaseLocks(locks, LOGGER)
     }
     Seq.empty
   }
@@ -355,8 +336,9 @@ private[sql] case class AlterTableDataTypeChange(
     val dbName = alterTableDataTypeChangeModel.databaseName
       .getOrElse(sparkSession.catalog.currentDatabase)
     LOGGER.audit(s"Alter table change data type request has been received for $dbName.$tableName")
-    val carbonLock = AlterTableUtil
-      .validateTableAndAcquireLock(dbName, tableName, LOGGER)(sparkSession)
+    val locksToBeAcquired = List(LockUsage.METADATA_LOCK, LockUsage.COMPACTION_LOCK)
+    val locks = AlterTableUtil
+      .validateTableAndAcquireLock(dbName, tableName, locksToBeAcquired, LOGGER)(sparkSession)
     try {
       // get the latest carbon table and check for column existence
       val carbonTable = CarbonMetadata.getInstance.getCarbonTable(dbName + "_" + tableName)
@@ -416,13 +398,7 @@ private[sql] case class AlterTableDataTypeChange(
         sys.error("Alter table data type change operation failed. Please check the logs")
     } finally {
       // release lock after command execution completion
-      if (carbonLock != null) {
-        if (carbonLock.unlock()) {
-          LOGGER.info("Alter table change data type lock released successfully")
-        } else {
-          LOGGER.error("Unable to release lock during alter table change data type operation")
-        }
-      }
+      AlterTableUtil.releaseLocks(locks, LOGGER)
     }
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
index f7ea344..6460490 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonMetastore.scala
@@ -53,7 +53,7 @@ import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFa
 import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.writer.ThriftWriter
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
-import org.apache.carbondata.spark.merger.TableMeta
+import org.apache.carbondata.processing.merger.TableMeta
 import org.apache.carbondata.spark.util.CarbonSparkUtil
 
 case class MetaData(var tablesMeta: ArrayBuffer[TableMeta]) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index 243eeb6..2e7eebf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.util
 
+import scala.collection.mutable.ListBuffer
+
 import org.apache.spark.SparkConf
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -27,12 +29,15 @@ import org.apache.carbondata.common.logging.LogService
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.format.{SchemaEvolutionEntry, TableInfo}
 
 object AlterTableUtil {
-
-  def validateTableAndAcquireLock(dbName: String, tableName: String, LOGGER: LogService)
-    (sparkSession: SparkSession): ICarbonLock = {
+  def validateTableAndAcquireLock(dbName: String,
+      tableName: String,
+      locksToBeAcquired: List[String],
+      LOGGER: LogService)
+    (sparkSession: SparkSession): List[ICarbonLock] = {
     val relation =
       CarbonEnv.get.carbonMetastore
         .lookupRelation(Option(dbName), tableName)(sparkSession)
@@ -44,17 +49,82 @@ object AlterTableUtil {
     }
     // acquire the lock first
     val table = relation.tableMeta.carbonTable
+    var acquiredLocks = ListBuffer[ICarbonLock]()
+    locksToBeAcquired.foreach { lock =>
+      acquiredLocks += getLockObject(table, lock, LOGGER)
+    }
+    acquiredLocks.toList
+  }
+
+  /**
+   * Given a lock type this method will return a new lock object if not acquired by any other
+   * operation
+   *
+   * @param carbonTable
+   * @param lockType
+   * @param LOGGER
+   * @return
+   */
+  private def getLockObject(carbonTable: CarbonTable,
+      lockType: String,
+      LOGGER: LogService): ICarbonLock = {
     val carbonLock = CarbonLockFactory
-      .getCarbonLockObj(table.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
-        LockUsage.METADATA_LOCK)
+      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier.getCarbonTableIdentifier,
+        lockType)
     if (carbonLock.lockWithRetries()) {
-      LOGGER.info("Successfully able to get the table metadata file lock")
+      LOGGER.info(s"Successfully acquired the lock $lockType")
     } else {
       sys.error("Table is locked for updation. Please try after some time")
     }
     carbonLock
   }
 
+  /**
+   * This method will release the locks acquired for an operation
+   *
+   * @param locks
+   * @param LOGGER
+   */
+  def releaseLocks(locks: List[ICarbonLock], LOGGER: LogService): Unit = {
+    locks.foreach { carbonLock =>
+      if (carbonLock.unlock()) {
+        LOGGER.info("Alter table lock released successfully")
+      } else {
+        LOGGER.error("Unable to release lock during alter table operation")
+      }
+    }
+  }
+
+  /**
+   * This method will release the locks by manually forming a lock path. Specific usage for
+   * rename table
+   *
+   * @param locks
+   * @param locksAcquired
+   * @param dbName
+   * @param tableName
+   * @param storeLocation
+   * @param LOGGER
+   */
+  def releaseLocksManually(locks: List[ICarbonLock],
+      locksAcquired: List[String],
+      dbName: String,
+      tableName: String,
+      storeLocation: String,
+      LOGGER: LogService): Unit = {
+    val lockLocation = storeLocation + CarbonCommonConstants.FILE_SEPARATOR +
+                       dbName + CarbonCommonConstants.FILE_SEPARATOR + tableName
+    locks.zip(locksAcquired).foreach { case (carbonLock, lockType) =>
+      val lockFilePath = lockLocation + CarbonCommonConstants.FILE_SEPARATOR +
+                         lockType
+      if (carbonLock.releaseLockManually(lockFilePath)) {
+        LOGGER.info(s"Alter table lock released successfully: ${ lockType }")
+      } else {
+        LOGGER.error("Unable to release lock during alter table operation")
+      }
+    }
+  }
+
   def updateSchemaInfo(carbonTable: CarbonTable,
       schemaEvolutionEntry: SchemaEvolutionEntry,
       thriftTable: TableInfo)(sparkSession: SparkSession, catalog: HiveExternalCatalog): Unit = {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
index 6ca8449..0d85062 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/Compaction.scala
@@ -20,7 +20,7 @@ import org.apache.spark.sql.{CarbonEnv, SparkSession}
 import org.apache.spark.sql.execution.command.{AlterTableCompaction, AlterTableModel}
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.spark.merger.CompactionType
+import org.apache.carbondata.processing.merger.CompactionType
 
 /**
  * table compaction api

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
index 914136c..91dd8b3 100644
--- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
+++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableValidationTestCase.scala
@@ -382,7 +382,7 @@ class AlterTableValidationTestCase extends QueryTest with BeforeAndAfterAll {
   }
 
   test("test to check if the lock file is successfully deleted") {
-    sql("create table lock_check(id int, name string) stored by 'carbondata'")
+      sql("create table lock_check(id int, name string) stored by 'carbondata'")
     sql("alter table lock_check rename to lock_rename")
     assert(!new File(s"${ CarbonCommonConstants.STORE_LOCATION } + /default/lock_rename/meta.lock")
       .exists())

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
new file mode 100644
index 0000000..f76c66f
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.processing.merger;
+
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.util.path.CarbonStorePath;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.store.CarbonDataFileAttributes;
+import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel;
+
+/**
+ * This class contains the common methods required for result processing during compaction based on
+ * restructure and normal scenarios
+ */
+public abstract class AbstractResultProcessor {
+
+  /**
+   * This method will perform the desired tasks of merging the selected slices
+   *
+   * @param resultIteratorList
+   * @return
+   */
+  public abstract boolean execute(List<RawResultIterator> resultIteratorList);
+
+  protected void setDataFileAttributesInModel(CarbonLoadModel loadModel,
+      CompactionType compactionType, CarbonTable carbonTable,
+      CarbonFactDataHandlerModel carbonFactDataHandlerModel) {
+    CarbonDataFileAttributes carbonDataFileAttributes;
+    if (compactionType == CompactionType.IUD_UPDDEL_DELTA_COMPACTION) {
+      int taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(),
+          CarbonStorePath.getCarbonTablePath(loadModel.getStorePath(),
+              carbonTable.getCarbonTableIdentifier()));
+      // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will
+      // be written in same segment. So the TaskNo should be incremented by 1 from max val.
+      int index = taskNo + 1;
+      carbonDataFileAttributes = new CarbonDataFileAttributes(index, loadModel.getFactTimeStamp());
+    } else {
+      carbonDataFileAttributes =
+          new CarbonDataFileAttributes(Integer.parseInt(loadModel.getTaskNo()),
+              loadModel.getFactTimeStamp());
+    }
+    carbonFactDataHandlerModel.setCarbonDataFileAttributes(carbonDataFileAttributes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
new file mode 100644
index 0000000..c00fe2e
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionExecutor.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.executor.QueryExecutor;
+import org.apache.carbondata.core.scan.executor.QueryExecutorFactory;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryDimension;
+import org.apache.carbondata.core.scan.model.QueryMeasure;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.BatchResult;
+import org.apache.carbondata.core.scan.result.iterator.RawResultIterator;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+/**
+ * Executor class for executing the query on the selected segments to be merged.
+ * This will fire a select * query and get the raw result.
+ */
+public class CarbonCompactionExecutor {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+  private final Map<String, List<DataFileFooter>> dataFileMetadataSegMapping;
+  private final SegmentProperties destinationSegProperties;
+  private final Map<String, TaskBlockInfo> segmentMapping;
+  private QueryExecutor queryExecutor;
+  private CarbonTable carbonTable;
+  private QueryModel queryModel;
+
+  /**
+   * flag to check whether any restructured block exists in the blocks selected for compaction.
+   * Based on this decision will be taken whether complete data has to be sorted again
+   */
+  private boolean restructuredBlockExists;
+
+  /**
+   * Constructor
+   *
+   * @param segmentMapping
+   * @param segmentProperties
+   * @param carbonTable
+   * @param dataFileMetadataSegMapping
+   * @param restructuredBlockExists
+   */
+  public CarbonCompactionExecutor(Map<String, TaskBlockInfo> segmentMapping,
+      SegmentProperties segmentProperties, CarbonTable carbonTable,
+      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping,
+      boolean restructuredBlockExists) {
+    this.segmentMapping = segmentMapping;
+    this.destinationSegProperties = segmentProperties;
+    this.carbonTable = carbonTable;
+    this.dataFileMetadataSegMapping = dataFileMetadataSegMapping;
+    this.restructuredBlockExists = restructuredBlockExists;
+  }
+
+  /**
+   * For processing of the table blocks.
+   *
+   * @return List of Carbon iterators
+   */
+  public List<RawResultIterator> processTableBlocks() throws QueryExecutionException, IOException {
+    List<RawResultIterator> resultList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<TableBlockInfo> list = null;
+    queryModel = prepareQueryModel(list);
+    // iterate each seg ID
+    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+      String segmentId = taskMap.getKey();
+      List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+      SegmentProperties sourceSegProperties = getSourceSegmentProperties(listMetadata);
+      // for each segment get taskblock info
+      TaskBlockInfo taskBlockInfo = taskMap.getValue();
+      Set<String> taskBlockListMapping = taskBlockInfo.getTaskSet();
+      for (String task : taskBlockListMapping) {
+        list = taskBlockInfo.getTableBlockInfoList(task);
+        Collections.sort(list);
+        LOGGER.info("for task -" + task + "-block size is -" + list.size());
+        queryModel.setTableBlockInfos(list);
+        resultList.add(new RawResultIterator(executeBlockList(list), sourceSegProperties,
+            destinationSegProperties));
+      }
+    }
+    return resultList;
+  }
+
+  /**
+   * This method will create the source segment properties based on restructured block existence
+   *
+   * @param listMetadata
+   * @return
+   */
+  private SegmentProperties getSourceSegmentProperties(List<DataFileFooter> listMetadata) {
+    SegmentProperties sourceSegProperties = null;
+    if (restructuredBlockExists) {
+      // update cardinality of source segment according to new schema
+      Map<String, Integer> columnToCardinalityMap =
+          new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      CarbonCompactionUtil
+          .addColumnCardinalityToMap(columnToCardinalityMap, listMetadata.get(0).getColumnInTable(),
+              listMetadata.get(0).getSegmentInfo().getColumnCardinality());
+      List<ColumnSchema> updatedColumnSchemaList =
+          new ArrayList<>(listMetadata.get(0).getColumnInTable().size());
+      int[] updatedColumnCardinalities = CarbonCompactionUtil
+          .updateColumnSchemaAndGetCardinality(columnToCardinalityMap, carbonTable,
+              updatedColumnSchemaList);
+      sourceSegProperties =
+          new SegmentProperties(updatedColumnSchemaList, updatedColumnCardinalities);
+    } else {
+      sourceSegProperties = new SegmentProperties(listMetadata.get(0).getColumnInTable(),
+          listMetadata.get(0).getSegmentInfo().getColumnCardinality());
+    }
+    return sourceSegProperties;
+  }
+
+  /**
+   * get executor and execute the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  private CarbonIterator<BatchResult> executeBlockList(List<TableBlockInfo> blockList)
+      throws QueryExecutionException, IOException {
+    queryModel.setTableBlockInfos(blockList);
+    this.queryExecutor = QueryExecutorFactory.getQueryExecutor(queryModel);
+    return queryExecutor.execute(queryModel);
+  }
+
+  /**
+   * Below method will be used
+   * for cleanup
+   */
+  public void finish() {
+    try {
+      queryExecutor.finish();
+    } catch (QueryExecutionException e) {
+      LOGGER.error(e, "Problem while finish: ");
+    }
+    clearDictionaryFromQueryModel();
+  }
+
+  /**
+   * This method will clear the dictionary access count after its usage is complete so
+   * that column can be deleted form LRU cache whenever memory reaches threshold
+   */
+  private void clearDictionaryFromQueryModel() {
+    if (null != queryModel) {
+      Map<String, Dictionary> columnToDictionaryMapping = queryModel.getColumnToDictionaryMapping();
+      if (null != columnToDictionaryMapping) {
+        for (Map.Entry<String, Dictionary> entry : columnToDictionaryMapping.entrySet()) {
+          CarbonUtil.clearDictionaryCache(entry.getValue());
+        }
+      }
+    }
+  }
+
+  /**
+   * Preparing of the query model.
+   *
+   * @param blockList
+   * @return
+   */
+  private QueryModel prepareQueryModel(List<TableBlockInfo> blockList) {
+    QueryModel model = new QueryModel();
+    model.setTableBlockInfos(blockList);
+    model.setForcedDetailRawQuery(true);
+    model.setFilterExpressionResolverTree(null);
+
+    List<QueryDimension> dims = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    List<CarbonDimension> dimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    for (CarbonDimension dim : dimensions) {
+      // check if dimension is deleted
+      QueryDimension queryDimension = new QueryDimension(dim.getColName());
+      queryDimension.setDimension(dim);
+      dims.add(queryDimension);
+    }
+    model.setQueryDimension(dims);
+
+    List<QueryMeasure> msrs = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<CarbonMeasure> measures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    for (CarbonMeasure carbonMeasure : measures) {
+      // check if measure is deleted
+      QueryMeasure queryMeasure = new QueryMeasure(carbonMeasure.getColName());
+      queryMeasure.setMeasure(carbonMeasure);
+      msrs.add(queryMeasure);
+    }
+    model.setQueryMeasures(msrs);
+    model.setQueryId(System.nanoTime() + "");
+    model.setAbsoluteTableIdentifier(carbonTable.getAbsoluteTableIdentifier());
+    model.setTable(carbonTable);
+    return model;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/cc59b247/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
new file mode 100644
index 0000000..2ad83a4
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.processing.merger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.datastore.block.TaskBlockInfo;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.commons.lang3.ArrayUtils;
+
+/**
+ * Utility Class for the Compaction Flow.
+ */
+public class CarbonCompactionUtil {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(CarbonCompactionExecutor.class.getName());
+
+  /**
+   * To create a mapping of Segment Id and TableBlockInfo.
+   *
+   * @param tableBlockInfoList
+   * @return
+   */
+  public static Map<String, TaskBlockInfo> createMappingForSegments(
+      List<TableBlockInfo> tableBlockInfoList) {
+
+    // stores taskBlockInfo of each segment
+    Map<String, TaskBlockInfo> segmentBlockInfoMapping =
+        new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+
+    for (TableBlockInfo info : tableBlockInfoList) {
+      String segId = info.getSegmentId();
+      // check if segId is already present in map
+      TaskBlockInfo taskBlockInfoMapping = segmentBlockInfoMapping.get(segId);
+      // extract task ID from file Path.
+      String taskNo = CarbonTablePath.DataFileUtil.getTaskNo(info.getFilePath());
+      // if taskBlockInfo is not there, then create and add
+      if (null == taskBlockInfoMapping) {
+        taskBlockInfoMapping = new TaskBlockInfo();
+        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+        // put the taskBlockInfo with respective segment id
+        segmentBlockInfoMapping.put(segId, taskBlockInfoMapping);
+      } else
+      {
+        groupCorrespodingInfoBasedOnTask(info, taskBlockInfoMapping, taskNo);
+      }
+    }
+    return segmentBlockInfoMapping;
+
+  }
+
+  /**
+   * Grouping the taskNumber and list of TableBlockInfo.
+   * @param info
+   * @param taskBlockMapping
+   * @param taskNo
+   */
+  private static void groupCorrespodingInfoBasedOnTask(TableBlockInfo info,
+      TaskBlockInfo taskBlockMapping, String taskNo) {
+    // get the corresponding list from task mapping.
+    List<TableBlockInfo> blockLists = taskBlockMapping.getTableBlockInfoList(taskNo);
+    if (null != blockLists) {
+      blockLists.add(info);
+    } else {
+      blockLists = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+      blockLists.add(info);
+      taskBlockMapping.addTableBlockInfoList(taskNo, blockLists);
+    }
+  }
+
+  /**
+   * To create a mapping of Segment Id and DataFileFooter.
+   *
+   * @param tableBlockInfoList
+   * @return
+   */
+  public static Map<String, List<DataFileFooter>> createDataFileFooterMappingForSegments(
+      List<TableBlockInfo> tableBlockInfoList) throws IOException {
+
+    Map<String, List<DataFileFooter>> segmentBlockInfoMapping = new HashMap<>();
+    for (TableBlockInfo blockInfo : tableBlockInfoList) {
+      List<DataFileFooter> eachSegmentBlocks = new ArrayList<>();
+      String segId = blockInfo.getSegmentId();
+      DataFileFooter dataFileMatadata = null;
+      // check if segId is already present in map
+      List<DataFileFooter> metadataList = segmentBlockInfoMapping.get(segId);
+      dataFileMatadata = CarbonUtil.readMetadatFile(blockInfo);
+      if (null == metadataList) {
+        // if it is not present
+        eachSegmentBlocks.add(dataFileMatadata);
+        segmentBlockInfoMapping.put(segId, eachSegmentBlocks);
+      } else {
+        // if its already present then update the list.
+        metadataList.add(dataFileMatadata);
+      }
+    }
+    return segmentBlockInfoMapping;
+
+  }
+
+  /**
+   * Check whether the file to indicate the compaction is present or not.
+   * @param metaFolderPath
+   * @return
+   */
+  public static boolean isCompactionRequiredForTable(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile)) || FileFactory
+          .isFileExist(majorCompactionStatusFile,
+              FileFactory.getFileType(majorCompactionStatusFile))) {
+        return true;
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in isFileExist compaction request file " + e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * Determine the type of the compaction received.
+   * @param metaFolderPath
+   * @return
+   */
+  public static CompactionType determineCompactionType(String metaFolderPath) {
+    String minorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.minorCompactionRequiredFile;
+
+    String majorCompactionStatusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+        + CarbonCommonConstants.majorCompactionRequiredFile;
+    try {
+      if (FileFactory.isFileExist(minorCompactionStatusFile,
+          FileFactory.getFileType(minorCompactionStatusFile))) {
+        return CompactionType.MINOR_COMPACTION;
+      }
+      if (FileFactory.isFileExist(majorCompactionStatusFile,
+          FileFactory.getFileType(majorCompactionStatusFile))) {
+        return CompactionType.MAJOR_COMPACTION;
+      }
+
+    } catch (IOException e) {
+      LOGGER.error("Exception in determining the compaction request file " + e.getMessage());
+    }
+    return CompactionType.MINOR_COMPACTION;
+  }
+
+  /**
+   * Delete the compation request file once the compaction is done.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean deleteCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String compactionRequiredFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      compactionRequiredFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (FileFactory
+          .isFileExist(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))) {
+        if (FileFactory
+            .getCarbonFile(compactionRequiredFile, FileFactory.getFileType(compactionRequiredFile))
+            .delete()) {
+          LOGGER.info("Deleted the compaction request file " + compactionRequiredFile);
+          return true;
+        } else {
+          LOGGER.error("Unable to delete the compaction request file " + compactionRequiredFile);
+        }
+      } else {
+        LOGGER.info("Compaction request file is not present. file is : " + compactionRequiredFile);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in deleting the compaction request file " + e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * Creation of the compaction request if someother compaction is in progress.
+   * @param metaFolderPath
+   * @param compactionType
+   * @return
+   */
+  public static boolean createCompactionRequiredFile(String metaFolderPath,
+      CompactionType compactionType) {
+    String statusFile;
+    if (compactionType.equals(CompactionType.MINOR_COMPACTION)) {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.minorCompactionRequiredFile;
+    } else {
+      statusFile = metaFolderPath + CarbonCommonConstants.FILE_SEPARATOR
+          + CarbonCommonConstants.majorCompactionRequiredFile;
+    }
+    try {
+      if (!FileFactory.isFileExist(statusFile, FileFactory.getFileType(statusFile))) {
+        if (FileFactory.createNewFile(statusFile, FileFactory.getFileType(statusFile))) {
+          LOGGER.info("successfully created a compaction required file - " + statusFile);
+          return true;
+        } else {
+          LOGGER.error("Not able to create a compaction required file - " + statusFile);
+          return false;
+        }
+      } else {
+        LOGGER.info("Compaction request file : " + statusFile + " already exist.");
+      }
+    } catch (IOException e) {
+      LOGGER.error("Exception in creating the compaction request file " + e.getMessage());
+    }
+    return false;
+  }
+
+  /**
+   * This will check if any compaction request has been received for any table.
+   *
+   * @param tableMetas
+   * @return
+   */
+  public static TableMeta getNextTableToCompact(TableMeta[] tableMetas,
+      List<CarbonTableIdentifier> skipList) {
+    for (TableMeta table : tableMetas) {
+      CarbonTable ctable = table.carbonTable;
+      String metadataPath = ctable.getMetaDataFilepath();
+      // check for the compaction required file and at the same time exclude the tables which are
+      // present in the skip list.
+      if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList
+          .contains(table.carbonTableIdentifier)) {
+        return table;
+      }
+    }
+    return null;
+  }
+
+  /**
+   * This method will add the prepare the max column cardinality map
+   *
+   * @param columnCardinalityMap
+   * @param currentBlockSchema
+   * @param currentBlockCardinality
+   */
+  public static void addColumnCardinalityToMap(Map<String, Integer> columnCardinalityMap,
+      List<ColumnSchema> currentBlockSchema, int[] currentBlockCardinality) {
+    for (int i = 0; i < currentBlockCardinality.length; i++) {
+      // add value to map only if does not exist or new cardinality is > existing value
+      String columnUniqueId = currentBlockSchema.get(i).getColumnUniqueId();
+      Integer value = columnCardinalityMap.get(columnUniqueId);
+      if (null == value) {
+        columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+      } else {
+        if (currentBlockCardinality[i] > value) {
+          columnCardinalityMap.put(columnUniqueId, currentBlockCardinality[i]);
+        }
+      }
+    }
+  }
+
+  /**
+   * This method will return the updated cardinality according to the master schema
+   *
+   * @param columnCardinalityMap
+   * @param carbonTable
+   * @param updatedColumnSchemaList
+   * @return
+   */
+  public static int[] updateColumnSchemaAndGetCardinality(Map<String, Integer> columnCardinalityMap,
+      CarbonTable carbonTable, List<ColumnSchema> updatedColumnSchemaList) {
+    List<CarbonDimension> masterDimensions =
+        carbonTable.getDimensionByTableName(carbonTable.getFactTableName());
+    List<Integer> updatedCardinalityList = new ArrayList<>(columnCardinalityMap.size());
+    for (CarbonDimension dimension : masterDimensions) {
+      Integer value = columnCardinalityMap.get(dimension.getColumnId());
+      if (null == value) {
+        updatedCardinalityList.add(getDimensionDefaultCardinality(dimension));
+      } else {
+        updatedCardinalityList.add(value);
+      }
+      updatedColumnSchemaList.add(dimension.getColumnSchema());
+    }
+    // add measures to the column schema list
+    List<CarbonMeasure> masterSchemaMeasures =
+        carbonTable.getMeasureByTableName(carbonTable.getFactTableName());
+    for (CarbonMeasure measure : masterSchemaMeasures) {
+      updatedColumnSchemaList.add(measure.getColumnSchema());
+    }
+    int[] updatedCardinality = ArrayUtils
+        .toPrimitive(updatedCardinalityList.toArray(new Integer[updatedCardinalityList.size()]));
+    return updatedCardinality;
+  }
+
+  /**
+   * This method will return the default cardinality based on dimension type
+   *
+   * @param dimension
+   * @return
+   */
+  private static int getDimensionDefaultCardinality(CarbonDimension dimension) {
+    int cardinality = 0;
+    if (dimension.hasEncoding(Encoding.DIRECT_DICTIONARY)) {
+      cardinality = Integer.MAX_VALUE;
+    } else if (dimension.hasEncoding(Encoding.DICTIONARY)) {
+      if (null != dimension.getDefaultValue()) {
+        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY + 1;
+      } else {
+        cardinality = CarbonCommonConstants.DICTIONARY_DEFAULT_CARDINALITY;
+      }
+    } else {
+      cardinality = -1;
+    }
+    return cardinality;
+  }
+
+  /**
+   * This method will check for any restructured block in the blocks selected for compaction
+   *
+   * @param segmentMapping
+   * @param dataFileMetadataSegMapping
+   * @param tableLastUpdatedTime
+   * @return
+   */
+  public static boolean checkIfAnyRestructuredBlockExists(Map<String, TaskBlockInfo> segmentMapping,
+      Map<String, List<DataFileFooter>> dataFileMetadataSegMapping, long tableLastUpdatedTime) {
+    boolean restructuredBlockExists = false;
+    for (Map.Entry<String, TaskBlockInfo> taskMap : segmentMapping.entrySet()) {
+      String segmentId = taskMap.getKey();
+      List<DataFileFooter> listMetadata = dataFileMetadataSegMapping.get(segmentId);
+      for (DataFileFooter dataFileFooter : listMetadata) {
+        // if schema modified timestamp is greater than footer stored schema timestamp,
+        // it indicates it is a restructured block
+        if (tableLastUpdatedTime > dataFileFooter.getSchemaUpdatedTimeStamp()) {
+          restructuredBlockExists = true;
+          break;
+        }
+      }
+      if (restructuredBlockExists) {
+        break;
+      }
+    }
+    return restructuredBlockExists;
+  }
+}