You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:41:54 UTC

[07/50] [abbrv] incubator-carbondata git commit: [Issue-578] Changing the minor compaction behavior to work on segment numbers. (#737)

[Issue-578] Changing the minor compaction behavior to work on segment numbers. (#737)

* Minor compaction need to be done based on the number of segments.
User will give the number of segments to be merged in levels.
* Add property carbon.compaction.level.threshold, removing carbon.compaction.minor.size

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/be46423a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/be46423a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/be46423a

Branch: refs/heads/master
Commit: be46423a08effd5fb784ed7d1ff1736adf44b670
Parents: d5636db
Author: ravikiran23 <ra...@gmail.com>
Authored: Sat Jun 25 13:12:30 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Sat Jun 25 13:12:30 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  33 +--
 .../carbondata/core/util/CarbonProperties.java  |  79 ++++--
 .../spark/merger/CarbonDataMergerUtil.java      | 247 ++++++++++++-------
 .../spark/rdd/CarbonDataRDDFactory.scala        | 123 ++++-----
 4 files changed, 305 insertions(+), 177 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index eb10406..cd25b88 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -881,16 +881,6 @@ public final class CarbonCommonConstants {
   public static final String INVALID_SEGMENT_ID = "-1";
 
   /**
-   * Size of Minor Compaction in MBs
-   */
-  public static final String MINOR_COMPACTION_SIZE = "carbon.minor.compaction.size";
-
-  /**
-   * By default size of minor compaction in MBs.
-   */
-  public static final String DEFAULT_MINOR_COMPACTION_SIZE = "256";
-
-  /**
    * Size of Major Compaction in MBs
    */
   public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
@@ -990,18 +980,6 @@ public final class CarbonCommonConstants {
   public static final String SEGMENT_COMPACTED = "Compacted";
 
   /**
-   * whether to include the compacted segments again for compaction or not.
-   */
-  public static final String INCLUDE_ALREADY_COMPACTED_SEGMENTS =
-      "carbon.include.compacted.segments";
-
-  /**
-   * whether to include the compacted segments again for compaction or not. default value is false.
-   * compacted load will not be compacted again in minor compaction.
-   */
-  public static final String INCLUDE_ALREADY_COMPACTED_SEGMENTS_DEFAULT =
-      "false";
-  /**
    * property for number of core to load the blocks in driver
    */
   public static final String NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT =
@@ -1040,6 +1018,17 @@ public final class CarbonCommonConstants {
   public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
                                                      + "while filter evaluation. ";
 
+  /**
+   * Number of unmerged segments to be merged.
+   */
+  public static final String COMPACTION_SEGMENT_LEVEL_THRESHOLD =
+      "carbon.compaction.level.threshold";
+
+  /**
+   * Default count for Number of segments to be merged in levels is 4,3
+   */
+  public static final String DEFAULT_SEGMENT_LEVEL_THRESHOLD = "4,3";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
index 60cc323..a337ac4 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
@@ -712,22 +712,6 @@ public final class CarbonProperties {
   }
 
   /**
-   * returns minor compaction size value from carbon properties or default value if it is not valid
-   *
-   * @return
-   */
-  public long getMinorCompactionSize() {
-    long compactionSize;
-    try {
-      compactionSize = Long.parseLong(getProperty(CarbonCommonConstants.MINOR_COMPACTION_SIZE,
-          CarbonCommonConstants.DEFAULT_MINOR_COMPACTION_SIZE));
-    } catch (NumberFormatException e) {
-      compactionSize = Long.parseLong(CarbonCommonConstants.DEFAULT_MINOR_COMPACTION_SIZE);
-    }
-    return compactionSize;
-  }
-
-  /**
    * returns the number of loads to be preserved.
    *
    * @return
@@ -758,4 +742,67 @@ public final class CarbonProperties {
     LOGGER.info(carbonProperties.toString());
   }
 
+  /**
+   * gettting the unmerged segment numbers to be merged.
+   * @return
+   */
+  public int[] getCompactionSegmentLevelCount() {
+    String commaSeparatedLevels;
+
+    commaSeparatedLevels = getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+    int[] compactionSize = getIntArray(commaSeparatedLevels);
+
+    if(null == compactionSize){
+      compactionSize = getIntArray(CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+    }
+
+    return compactionSize;
+  }
+
+  /**
+   *
+   * @param commaSeparatedLevels
+   * @return
+   */
+  private int[] getIntArray(String commaSeparatedLevels) {
+    String[] levels = commaSeparatedLevels.split(",");
+    int[] compactionSize = new int[levels.length];
+    int i = 0;
+    for (String levelSize : levels) {
+      try {
+        int size = Integer.parseInt(levelSize.trim());
+        if(validate(size,100,0,-1) < 0 ){
+          // if given size is out of boundary then take default value for all levels.
+          return null;
+        }
+        compactionSize[i++] = size;
+      }
+      catch(NumberFormatException e){
+        LOGGER.error(
+            "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
+                + " is not proper. Taking the default value "
+                + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+        return null;
+      }
+    }
+    return compactionSize;
+  }
+
+  /**
+   * Validate the restrictions
+   *
+   * @param actual
+   * @param max
+   * @param min
+   * @param defaultVal
+   * @return
+   */
+  public int validate(int actual, int max, int min, int defaultVal) {
+    if (actual <= max && actual >= min) {
+      return actual;
+    }
+    return defaultVal;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index f536293..7accb91 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -62,36 +62,19 @@ public final class CarbonDataMergerUtil {
 
   }
 
+  /**
+   * Returns the size of all the carbondata files present in the segment.
+   * @param carbonFile
+   * @return
+   */
   private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
     long factSize = 0;
 
-    // check if update fact is present.
-
-    CarbonFile[] factFileUpdated = carbonFile.listFiles(new CarbonFileFilter() {
-
-      @Override public boolean accept(CarbonFile file) {
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_UPDATE_EXTENSION)) {
-          return true;
-        }
-        return false;
-      }
-    });
-
-    if (factFileUpdated.length != 0) {
-      for (CarbonFile fact : factFileUpdated) {
-        factSize += fact.getSize();
-      }
-      return factSize;
-    }
-
-    // normal fact case.
+    // carbon data file case.
     CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
 
       @Override public boolean accept(CarbonFile file) {
-        if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
-          return true;
-        }
-        return false;
+        return CarbonTablePath.isCarbonDataFile(file.getName());
       }
     });
 
@@ -234,25 +217,44 @@ public final class CarbonDataMergerUtil {
       CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize,
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
 
+    List sortedSegments = new ArrayList(segments);
+    // sort the segment details.
+    Collections.sort(sortedSegments, new Comparator<LoadMetadataDetails>() {
+      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+        double seg1Id = Double.parseDouble(seg1.getLoadName());
+        double seg2Id = Double.parseDouble(seg2.getLoadName());
+        if (seg1Id - seg2Id < 0) {
+          return -1;
+        }
+        if (seg1Id - seg2Id > 0) {
+          return 1;
+        }
+        return 0;
+      }
+    });
+
     // check preserve property and preserve the configured number of latest loads.
 
     List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
-        checkPreserveSegmentsPropertyReturnRemaining(segments);
+        checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
 
     // filter the segments if the compaction based on days is configured.
 
     List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
         identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
-
+    List<LoadMetadataDetails> listOfSegmentsToBeMerged;
     // identify the segments to merge based on the Size of the segments across partition.
+    if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
 
-    List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize =
-        identifySegmentsToBeMergedBasedOnSize(compactionSize,
-            listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation,
-            compactionType);
+      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
+          listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation);
+    } else {
 
+      listOfSegmentsToBeMerged =
+          identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+    }
 
-    return listOfSegmentsBelowThresholdSize;
+    return listOfSegmentsToBeMerged;
   }
 
   /**
@@ -373,7 +375,7 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * Identify the segments to be merged based on the Size.
+   * Identify the segments to be merged based on the Size in case of Major compaction.
    *
    * @param compactionSize
    * @param listOfSegmentsAfterPreserve
@@ -384,8 +386,7 @@ public final class CarbonDataMergerUtil {
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
       long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
-      CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation,
-      CompactionType compactionType) {
+      CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) {
 
     List<LoadMetadataDetails> segmentsToBeMerged =
         new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -393,75 +394,157 @@ public final class CarbonDataMergerUtil {
     CarbonTableIdentifier tableIdentifier =
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
 
-    // variable to store one  segment size across partition.
-    long sizeOfOneSegmentAcrossPartition = 0;
 
     // total length
     long totalLength = 0;
 
-    String includeCompactedSegments = CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.INCLUDE_ALREADY_COMPACTED_SEGMENTS,
-            CarbonCommonConstants.INCLUDE_ALREADY_COMPACTED_SEGMENTS_DEFAULT);
-
     // check size of each segment , sum it up across partitions
     for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
 
       String segId = segment.getLoadName();
-
-      // in case of minor compaction . check the property whether to include the
-      // compacted segment or not.
-      // check if the segment is compacted or not.
-      if (CompactionType.MINOR_COMPACTION.equals(compactionType) && includeCompactedSegments
-          .equalsIgnoreCase("false") && segId.contains(".")) {
-        continue;
+      // variable to store one  segment size across partition.
+      long sizeOfOneSegmentAcrossPartition =
+          getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId);
+
+      // if size of a segment is greater than the Major compaction size. then ignore it.
+      if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
+        // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
+          break;
+        } else { // if only one segment is found then remove the earlier one in list.
+          // reset the total length to 0.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          totalLength = 0;
+          continue;
+        }
       }
 
-      // calculate size across partitions
-      for (int partition = 0; partition < partitionCount; partition++) {
-
-        String loadPath = CarbonLoaderUtil
-            .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
-
-        CarbonFile segmentFolder =
-            FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-
-        long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
-
-        sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
-      }
       totalLength += sizeOfOneSegmentAcrossPartition;
-      // in case of minor compaction the size of the segments should exceed the
-      // minor compaction limit then only compaction will occur.
+
       // in case of major compaction the size doesnt matter. all the segments will be merged.
       if (totalLength < (compactionSize * 1024 * 1024)) {
         segmentsToBeMerged.add(segment);
-      }
-      // in case if minor we will merge segments only when it exceeds limit
-      // so check whether limit has been exceeded. if yes then break loop.
-      if (CompactionType.MINOR_COMPACTION.equals(compactionType)) {
-        if (totalLength > (compactionSize * 1024 * 1024)) {
-          // if size of segments exceeds then take those segments and merge.
-          // i.e if 1st segment is 200mb and 2nd segment is 100mb.
-          //  and compaction size is 256mb . we need to merge these 2 loads. so added this check.
-          segmentsToBeMerged.add(segment);
+      } else { // if already 2 segments have been found for merging then stop scan here and merge.
+        if (segmentsToBeMerged.size() > 1) {
           break;
+        } else { // if only one segment is found then remove the earlier one in list and put this.
+          // reset the total length to the current identified segment.
+          segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+          segmentsToBeMerged.add(segment);
+          totalLength = sizeOfOneSegmentAcrossPartition;
         }
       }
 
-      // after all partitions
-      sizeOfOneSegmentAcrossPartition = 0;
     }
 
-    // if type is minor then we need to check the total size whether it has reached the limit of
-    // compaction size.
-    if (CompactionType.MINOR_COMPACTION.equals(compactionType)) {
-      if (totalLength < compactionSize * 1024 * 1024) {
-        // no need to do the compaction.
-        segmentsToBeMerged.removeAll(segmentsToBeMerged);
+    return segmentsToBeMerged;
+  }
+
+  /**
+   * For calculating the size of a segment across all partition.
+   * @param partitionCount
+   * @param storeLocation
+   * @param tableIdentifier
+   * @param segId
+   * @return
+   */
+  private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation,
+      CarbonTableIdentifier tableIdentifier, String segId) {
+    long sizeOfOneSegmentAcrossPartition = 0;
+    // calculate size across partitions
+    for (int partition = 0; partition < partitionCount; partition++) {
+
+      String loadPath = CarbonLoaderUtil
+          .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
+
+      CarbonFile segmentFolder =
+          FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+
+      long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
+
+      sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
+    }
+    return sizeOfOneSegmentAcrossPartition;
+  }
+
+  /**
+   * Identify the segments to be merged based on the segment count
+   *
+   * @param listOfSegmentsAfterPreserve
+   * @return
+   */
+  private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
+      List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+
+    List<LoadMetadataDetails> mergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    List<LoadMetadataDetails> unMergedSegments =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+    int[] noOfSegmentLevelsCount =
+        CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+
+    int level1Size = 0;
+    int level2Size = 0;
+    boolean first = true;
+
+    for(int levelCount : noOfSegmentLevelsCount){
+      if(first){
+        level1Size = levelCount;
+        first = false;
       }
+      else{
+        level2Size = levelCount;
+        break;
+        // breaking as we are doing only 2 levels
+      }
+
     }
 
-    return segmentsToBeMerged;
+    int unMergeCounter = 0;
+    int mergeCounter = 0;
+
+    // check size of each segment , sum it up across partitions
+    for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+      String segName = segment.getLoadName();
+
+      // if a segment is already merged 2 levels then it s name will become .2
+      // need to exclude those segments from minor compaction.
+      if (segName.endsWith(".2")) {
+        continue;
+      }
+
+      // check if the segment is merged or not
+
+      if (!isMergedSegment(segName)) {
+        //if it is an unmerged segment then increment counter
+        unMergeCounter++;
+        unMergedSegments.add(segment);
+        if (unMergeCounter == (level1Size)) {
+          return unMergedSegments;
+        }
+      } else {
+        mergeCounter++;
+        mergedSegments.add(segment);
+        if (mergeCounter == (level2Size)) {
+          return mergedSegments;
+        }
+      }
+    }
+    return new ArrayList<>(0);
+  }
+
+  /**
+   * To check if the segment is merged or not.
+   * @param segName
+   * @return
+   */
+  private static boolean isMergedSegment(String segName) {
+    if(segName.contains(".")){
+      return true;
+    }
+    return false;
   }
 
   /**
@@ -550,10 +633,6 @@ public final class CarbonDataMergerUtil {
 
     long compactionSize = 0;
     switch (compactionType) {
-      case MINOR_COMPACTION:
-        compactionSize = CarbonProperties.getInstance().getMinorCompactionSize();
-        break;
-
       case MAJOR_COMPACTION:
         compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
         break;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2575f0d..e534e3d 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -301,7 +301,6 @@ object CarbonDataRDDFactory extends Logging {
       compactionType = CompactionType.MAJOR_COMPACTION
     }
     else {
-      compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MINOR_COMPACTION)
       compactionType = CompactionType.MINOR_COMPACTION
     }
 
@@ -320,8 +319,6 @@ object CarbonDataRDDFactory extends Logging {
     val loadStartTime = CarbonLoaderUtil.readCurrentTime()
     carbonLoadModel.setFactTimeStamp(loadStartTime)
 
-    val executor: ExecutorService = Executors.newFixedThreadPool(1)
-
     val compactionModel = CompactionModel(compactionSize,
       compactionType,
       carbonTable,
@@ -342,7 +339,6 @@ object CarbonDataRDDFactory extends Logging {
         hdfsStoreLocation,
         kettleHomePath,
         storeLocation,
-        executor,
         compactionModel,
         lock
       )
@@ -366,14 +362,14 @@ object CarbonDataRDDFactory extends Logging {
     hdfsStoreLocation: String,
     kettleHomePath: String,
     storeLocation: String,
-    executor: ExecutorService,
     compactionModel: CompactionModel,
     compactionLock: ICarbonLock): Unit = {
+    val executor: ExecutorService = Executors.newFixedThreadPool(1)
     // update the updated table status.
     readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
     var segList: util.List[LoadMetadataDetails] = carbonLoadModel.getLoadMetadataDetails
 
-    val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+    var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
       hdfsStoreLocation,
       carbonLoadModel,
       partitioner.partitionCount,
@@ -387,49 +383,14 @@ object CarbonDataRDDFactory extends Logging {
       new Thread {
         override def run(): Unit = {
 
-          val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
-            CarbonCommonConstants
-              .DEFAULT_COLLECTION_SIZE
-          )
-          breakable {
-            while (true) {
-
-              val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-                hdfsStoreLocation,
-                carbonLoadModel,
-                partitioner.partitionCount,
-                compactionModel.compactionSize,
-                segList,
-                compactionModel.compactionType
-              )
-              if (loadsToMerge.size() > 1) {
-                loadsToMerge.asScala.foreach(seg => {
-                  logger.info("load identified for merge is " + seg.getLoadName)
-                }
-                )
+          while (loadsToMerge.size() > 1) {
+
+            val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+              CarbonCommonConstants
+                .DEFAULT_COLLECTION_SIZE
+            )
+            scanSegmentsAndSubmitJob(futureList)
 
-                val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
-                  carbonLoadModel,
-                  partitioner,
-                  storeLocation,
-                  compactionModel.carbonTable,
-                  kettleHomePath,
-                  compactionModel.cubeCreationTime,
-                  loadsToMerge,
-                  sqlContext
-                )
-                )
-                futureList.add(future)
-                segList = CarbonDataMergerUtil
-                  .filterOutAlreadyMergedSegments(segList, loadsToMerge)
-              }
-              else {
-                executor.shutdown()
-                break
-              }
-            }
-          }
-          try {
             futureList.asScala.foreach(future => {
               try {
                 future.get
@@ -440,16 +401,71 @@ object CarbonDataRDDFactory extends Logging {
               }
             }
             )
+            // scan again and deterrmine if anything is there to merge again.
+            readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
+            segList = carbonLoadModel.getLoadMetadataDetails
+
+            loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+              hdfsStoreLocation,
+              carbonLoadModel,
+              partitioner.partitionCount,
+              compactionModel.compactionSize,
+              segList,
+              compactionModel.compactionType
+            )
           }
-          finally {
-            compactionLock.unlock
-          }
+          executor.shutdown()
+          compactionLock.unlock()
         }
       }.start
     }
     else {
       compactionLock.unlock()
     }
+
+    /**
+     * This will scan all the segments and submit the loads to be merged into the executor.
+     * @param futureList
+     */
+    def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
+      breakable {
+        while (true) {
+
+          val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+            hdfsStoreLocation,
+            carbonLoadModel,
+            partitioner.partitionCount,
+            compactionModel.compactionSize,
+            segList,
+            compactionModel.compactionType
+          )
+          if (loadsToMerge.size() > 1) {
+            loadsToMerge.asScala.foreach(seg => {
+              logger.info("load identified for merge is " + seg.getLoadName)
+            }
+            )
+
+            val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
+              carbonLoadModel,
+              partitioner,
+              storeLocation,
+              compactionModel.carbonTable,
+              kettleHomePath,
+              compactionModel.cubeCreationTime,
+              loadsToMerge,
+              sqlContext
+            )
+            )
+            futureList.add(future)
+            segList = CarbonDataMergerUtil
+              .filterOutAlreadyMergedSegments(segList, loadsToMerge)
+          }
+          else {
+            break
+          }
+        }
+      }
+    }
   }
 
   def loadCarbonData(sc: SQLContext,
@@ -473,9 +489,7 @@ object CarbonDataRDDFactory extends Logging {
           .audit("Compaction request received for table " + carbonLoadModel
             .getDatabaseName + "." + carbonLoadModel.getTableName
           )
-        val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MINOR_COMPACTION)
-
-        val executor: ExecutorService = Executors.newFixedThreadPool(1)
+        val compactionSize = 0
 
         val compactionModel = CompactionModel(compactionSize,
           CompactionType.MINOR_COMPACTION,
@@ -503,7 +517,6 @@ object CarbonDataRDDFactory extends Logging {
             hdfsStoreLocation,
             kettleHomePath,
             storeLocation,
-            executor,
             compactionModel,
             lock
           )