You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ch...@apache.org on 2016/06/30 17:41:54 UTC
[07/50] [abbrv] incubator-carbondata git commit: [Issue-578] Changing
the minor compaction behavior to work on segment numbers. (#737)
[Issue-578] Changing the minor compaction behavior to work on segment numbers. (#737)
* Minor compaction need to be done based on the number of segments.
User will give the number of segments to be merged in levels.
* Add property carbon.compaction.level.threshold, removing carbon.compaction.minor.size
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/be46423a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/be46423a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/be46423a
Branch: refs/heads/master
Commit: be46423a08effd5fb784ed7d1ff1736adf44b670
Parents: d5636db
Author: ravikiran23 <ra...@gmail.com>
Authored: Sat Jun 25 13:12:30 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Sat Jun 25 13:12:30 2016 +0530
----------------------------------------------------------------------
.../core/constants/CarbonCommonConstants.java | 33 +--
.../carbondata/core/util/CarbonProperties.java | 79 ++++--
.../spark/merger/CarbonDataMergerUtil.java | 247 ++++++++++++-------
.../spark/rdd/CarbonDataRDDFactory.scala | 123 ++++-----
4 files changed, 305 insertions(+), 177 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index eb10406..cd25b88 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -881,16 +881,6 @@ public final class CarbonCommonConstants {
public static final String INVALID_SEGMENT_ID = "-1";
/**
- * Size of Minor Compaction in MBs
- */
- public static final String MINOR_COMPACTION_SIZE = "carbon.minor.compaction.size";
-
- /**
- * By default size of minor compaction in MBs.
- */
- public static final String DEFAULT_MINOR_COMPACTION_SIZE = "256";
-
- /**
* Size of Major Compaction in MBs
*/
public static final String MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
@@ -990,18 +980,6 @@ public final class CarbonCommonConstants {
public static final String SEGMENT_COMPACTED = "Compacted";
/**
- * whether to include the compacted segments again for compaction or not.
- */
- public static final String INCLUDE_ALREADY_COMPACTED_SEGMENTS =
- "carbon.include.compacted.segments";
-
- /**
- * whether to include the compacted segments again for compaction or not. default value is false.
- * compacted load will not be compacted again in minor compaction.
- */
- public static final String INCLUDE_ALREADY_COMPACTED_SEGMENTS_DEFAULT =
- "false";
- /**
* property for number of core to load the blocks in driver
*/
public static final String NUMBER_OF_CORE_TO_LOAD_DRIVER_SEGMENT =
@@ -1040,6 +1018,17 @@ public final class CarbonCommonConstants {
public static final String FILTER_INVALID_MEMBER = " Invalid Record(s) are present "
+ "while filter evaluation. ";
+ /**
+ * Number of unmerged segments to be merged.
+ */
+ public static final String COMPACTION_SEGMENT_LEVEL_THRESHOLD =
+ "carbon.compaction.level.threshold";
+
+ /**
+ * Default count for Number of segments to be merged in levels is 4,3
+ */
+ public static final String DEFAULT_SEGMENT_LEVEL_THRESHOLD = "4,3";
+
private CarbonCommonConstants() {
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
index 60cc323..a337ac4 100644
--- a/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/carbondata/core/util/CarbonProperties.java
@@ -712,22 +712,6 @@ public final class CarbonProperties {
}
/**
- * returns minor compaction size value from carbon properties or default value if it is not valid
- *
- * @return
- */
- public long getMinorCompactionSize() {
- long compactionSize;
- try {
- compactionSize = Long.parseLong(getProperty(CarbonCommonConstants.MINOR_COMPACTION_SIZE,
- CarbonCommonConstants.DEFAULT_MINOR_COMPACTION_SIZE));
- } catch (NumberFormatException e) {
- compactionSize = Long.parseLong(CarbonCommonConstants.DEFAULT_MINOR_COMPACTION_SIZE);
- }
- return compactionSize;
- }
-
- /**
* returns the number of loads to be preserved.
*
* @return
@@ -758,4 +742,67 @@ public final class CarbonProperties {
LOGGER.info(carbonProperties.toString());
}
+ /**
+ * gettting the unmerged segment numbers to be merged.
+ * @return
+ */
+ public int[] getCompactionSegmentLevelCount() {
+ String commaSeparatedLevels;
+
+ commaSeparatedLevels = getProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+ CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+ int[] compactionSize = getIntArray(commaSeparatedLevels);
+
+ if(null == compactionSize){
+ compactionSize = getIntArray(CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+ }
+
+ return compactionSize;
+ }
+
+ /**
+ *
+ * @param commaSeparatedLevels
+ * @return
+ */
+ private int[] getIntArray(String commaSeparatedLevels) {
+ String[] levels = commaSeparatedLevels.split(",");
+ int[] compactionSize = new int[levels.length];
+ int i = 0;
+ for (String levelSize : levels) {
+ try {
+ int size = Integer.parseInt(levelSize.trim());
+ if(validate(size,100,0,-1) < 0 ){
+ // if given size is out of boundary then take default value for all levels.
+ return null;
+ }
+ compactionSize[i++] = size;
+ }
+ catch(NumberFormatException e){
+ LOGGER.error(
+ "Given value for property" + CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD
+ + " is not proper. Taking the default value "
+ + CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD);
+ return null;
+ }
+ }
+ return compactionSize;
+ }
+
+ /**
+ * Validate the restrictions
+ *
+ * @param actual
+ * @param max
+ * @param min
+ * @param defaultVal
+ * @return
+ */
+ public int validate(int actual, int max, int min, int defaultVal) {
+ if (actual <= max && actual >= min) {
+ return actual;
+ }
+ return defaultVal;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index f536293..7accb91 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -62,36 +62,19 @@ public final class CarbonDataMergerUtil {
}
+ /**
+ * Returns the size of all the carbondata files present in the segment.
+ * @param carbonFile
+ * @return
+ */
private static long getSizeOfFactFileInLoad(CarbonFile carbonFile) {
long factSize = 0;
- // check if update fact is present.
-
- CarbonFile[] factFileUpdated = carbonFile.listFiles(new CarbonFileFilter() {
-
- @Override public boolean accept(CarbonFile file) {
- if (file.getName().endsWith(CarbonCommonConstants.FACT_UPDATE_EXTENSION)) {
- return true;
- }
- return false;
- }
- });
-
- if (factFileUpdated.length != 0) {
- for (CarbonFile fact : factFileUpdated) {
- factSize += fact.getSize();
- }
- return factSize;
- }
-
- // normal fact case.
+ // carbon data file case.
CarbonFile[] factFile = carbonFile.listFiles(new CarbonFileFilter() {
@Override public boolean accept(CarbonFile file) {
- if (file.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT)) {
- return true;
- }
- return false;
+ return CarbonTablePath.isCarbonDataFile(file.getName());
}
});
@@ -234,25 +217,44 @@ public final class CarbonDataMergerUtil {
CarbonLoadModel carbonLoadModel, int partitionCount, long compactionSize,
List<LoadMetadataDetails> segments, CompactionType compactionType) {
+ List sortedSegments = new ArrayList(segments);
+ // sort the segment details.
+ Collections.sort(sortedSegments, new Comparator<LoadMetadataDetails>() {
+ @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+ double seg1Id = Double.parseDouble(seg1.getLoadName());
+ double seg2Id = Double.parseDouble(seg2.getLoadName());
+ if (seg1Id - seg2Id < 0) {
+ return -1;
+ }
+ if (seg1Id - seg2Id > 0) {
+ return 1;
+ }
+ return 0;
+ }
+ });
+
// check preserve property and preserve the configured number of latest loads.
List<LoadMetadataDetails> listOfSegmentsAfterPreserve =
- checkPreserveSegmentsPropertyReturnRemaining(segments);
+ checkPreserveSegmentsPropertyReturnRemaining(sortedSegments);
// filter the segments if the compaction based on days is configured.
List<LoadMetadataDetails> listOfSegmentsLoadedInSameDateInterval =
identifySegmentsToBeMergedBasedOnLoadedDate(listOfSegmentsAfterPreserve);
-
+ List<LoadMetadataDetails> listOfSegmentsToBeMerged;
// identify the segments to merge based on the Size of the segments across partition.
+ if (compactionType.equals(CompactionType.MAJOR_COMPACTION)) {
- List<LoadMetadataDetails> listOfSegmentsBelowThresholdSize =
- identifySegmentsToBeMergedBasedOnSize(compactionSize,
- listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation,
- compactionType);
+ listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
+ listOfSegmentsLoadedInSameDateInterval, carbonLoadModel, partitionCount, storeLocation);
+ } else {
+ listOfSegmentsToBeMerged =
+ identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval);
+ }
- return listOfSegmentsBelowThresholdSize;
+ return listOfSegmentsToBeMerged;
}
/**
@@ -373,7 +375,7 @@ public final class CarbonDataMergerUtil {
}
/**
- * Identify the segments to be merged based on the Size.
+ * Identify the segments to be merged based on the Size in case of Major compaction.
*
* @param compactionSize
* @param listOfSegmentsAfterPreserve
@@ -384,8 +386,7 @@ public final class CarbonDataMergerUtil {
*/
private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSize(
long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
- CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation,
- CompactionType compactionType) {
+ CarbonLoadModel carbonLoadModel, int partitionCount, String storeLocation) {
List<LoadMetadataDetails> segmentsToBeMerged =
new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -393,75 +394,157 @@ public final class CarbonDataMergerUtil {
CarbonTableIdentifier tableIdentifier =
carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier();
- // variable to store one segment size across partition.
- long sizeOfOneSegmentAcrossPartition = 0;
// total length
long totalLength = 0;
- String includeCompactedSegments = CarbonProperties.getInstance()
- .getProperty(CarbonCommonConstants.INCLUDE_ALREADY_COMPACTED_SEGMENTS,
- CarbonCommonConstants.INCLUDE_ALREADY_COMPACTED_SEGMENTS_DEFAULT);
-
// check size of each segment , sum it up across partitions
for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
String segId = segment.getLoadName();
-
- // in case of minor compaction . check the property whether to include the
- // compacted segment or not.
- // check if the segment is compacted or not.
- if (CompactionType.MINOR_COMPACTION.equals(compactionType) && includeCompactedSegments
- .equalsIgnoreCase("false") && segId.contains(".")) {
- continue;
+ // variable to store one segment size across partition.
+ long sizeOfOneSegmentAcrossPartition =
+ getSizeOfOneSegmentAcrossPartition(partitionCount, storeLocation, tableIdentifier, segId);
+
+ // if size of a segment is greater than the Major compaction size. then ignore it.
+ if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) {
+ // if already 2 segments have been found for merging then stop scan here and merge.
+ if (segmentsToBeMerged.size() > 1) {
+ break;
+ } else { // if only one segment is found then remove the earlier one in list.
+ // reset the total length to 0.
+ segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ totalLength = 0;
+ continue;
+ }
}
- // calculate size across partitions
- for (int partition = 0; partition < partitionCount; partition++) {
-
- String loadPath = CarbonLoaderUtil
- .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
-
- CarbonFile segmentFolder =
- FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
-
- long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
-
- sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
- }
totalLength += sizeOfOneSegmentAcrossPartition;
- // in case of minor compaction the size of the segments should exceed the
- // minor compaction limit then only compaction will occur.
+
// in case of major compaction the size doesnt matter. all the segments will be merged.
if (totalLength < (compactionSize * 1024 * 1024)) {
segmentsToBeMerged.add(segment);
- }
- // in case if minor we will merge segments only when it exceeds limit
- // so check whether limit has been exceeded. if yes then break loop.
- if (CompactionType.MINOR_COMPACTION.equals(compactionType)) {
- if (totalLength > (compactionSize * 1024 * 1024)) {
- // if size of segments exceeds then take those segments and merge.
- // i.e if 1st segment is 200mb and 2nd segment is 100mb.
- // and compaction size is 256mb . we need to merge these 2 loads. so added this check.
- segmentsToBeMerged.add(segment);
+ } else { // if already 2 segments have been found for merging then stop scan here and merge.
+ if (segmentsToBeMerged.size() > 1) {
break;
+ } else { // if only one segment is found then remove the earlier one in list and put this.
+ // reset the total length to the current identified segment.
+ segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ segmentsToBeMerged.add(segment);
+ totalLength = sizeOfOneSegmentAcrossPartition;
}
}
- // after all partitions
- sizeOfOneSegmentAcrossPartition = 0;
}
- // if type is minor then we need to check the total size whether it has reached the limit of
- // compaction size.
- if (CompactionType.MINOR_COMPACTION.equals(compactionType)) {
- if (totalLength < compactionSize * 1024 * 1024) {
- // no need to do the compaction.
- segmentsToBeMerged.removeAll(segmentsToBeMerged);
+ return segmentsToBeMerged;
+ }
+
+ /**
+ * For calculating the size of a segment across all partition.
+ * @param partitionCount
+ * @param storeLocation
+ * @param tableIdentifier
+ * @param segId
+ * @return
+ */
+ private static long getSizeOfOneSegmentAcrossPartition(int partitionCount, String storeLocation,
+ CarbonTableIdentifier tableIdentifier, String segId) {
+ long sizeOfOneSegmentAcrossPartition = 0;
+ // calculate size across partitions
+ for (int partition = 0; partition < partitionCount; partition++) {
+
+ String loadPath = CarbonLoaderUtil
+ .getStoreLocation(storeLocation, tableIdentifier, segId, partition + "");
+
+ CarbonFile segmentFolder =
+ FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath));
+
+ long sizeOfEachSegment = getSizeOfFactFileInLoad(segmentFolder);
+
+ sizeOfOneSegmentAcrossPartition += sizeOfEachSegment;
+ }
+ return sizeOfOneSegmentAcrossPartition;
+ }
+
+ /**
+ * Identify the segments to be merged based on the segment count
+ *
+ * @param listOfSegmentsAfterPreserve
+ * @return
+ */
+ private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
+ List<LoadMetadataDetails> listOfSegmentsAfterPreserve) {
+
+ List<LoadMetadataDetails> mergedSegments =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+ List<LoadMetadataDetails> unMergedSegments =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+ int[] noOfSegmentLevelsCount =
+ CarbonProperties.getInstance().getCompactionSegmentLevelCount();
+
+ int level1Size = 0;
+ int level2Size = 0;
+ boolean first = true;
+
+ for(int levelCount : noOfSegmentLevelsCount){
+ if(first){
+ level1Size = levelCount;
+ first = false;
}
+ else{
+ level2Size = levelCount;
+ break;
+ // breaking as we are doing only 2 levels
+ }
+
}
- return segmentsToBeMerged;
+ int unMergeCounter = 0;
+ int mergeCounter = 0;
+
+ // check size of each segment , sum it up across partitions
+ for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
+
+ String segName = segment.getLoadName();
+
+ // if a segment is already merged 2 levels then it s name will become .2
+ // need to exclude those segments from minor compaction.
+ if (segName.endsWith(".2")) {
+ continue;
+ }
+
+ // check if the segment is merged or not
+
+ if (!isMergedSegment(segName)) {
+ //if it is an unmerged segment then increment counter
+ unMergeCounter++;
+ unMergedSegments.add(segment);
+ if (unMergeCounter == (level1Size)) {
+ return unMergedSegments;
+ }
+ } else {
+ mergeCounter++;
+ mergedSegments.add(segment);
+ if (mergeCounter == (level2Size)) {
+ return mergedSegments;
+ }
+ }
+ }
+ return new ArrayList<>(0);
+ }
+
+ /**
+ * To check if the segment is merged or not.
+ * @param segName
+ * @return
+ */
+ private static boolean isMergedSegment(String segName) {
+ if(segName.contains(".")){
+ return true;
+ }
+ return false;
}
/**
@@ -550,10 +633,6 @@ public final class CarbonDataMergerUtil {
long compactionSize = 0;
switch (compactionType) {
- case MINOR_COMPACTION:
- compactionSize = CarbonProperties.getInstance().getMinorCompactionSize();
- break;
-
case MAJOR_COMPACTION:
compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
break;
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be46423a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 2575f0d..e534e3d 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -301,7 +301,6 @@ object CarbonDataRDDFactory extends Logging {
compactionType = CompactionType.MAJOR_COMPACTION
}
else {
- compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MINOR_COMPACTION)
compactionType = CompactionType.MINOR_COMPACTION
}
@@ -320,8 +319,6 @@ object CarbonDataRDDFactory extends Logging {
val loadStartTime = CarbonLoaderUtil.readCurrentTime()
carbonLoadModel.setFactTimeStamp(loadStartTime)
- val executor: ExecutorService = Executors.newFixedThreadPool(1)
-
val compactionModel = CompactionModel(compactionSize,
compactionType,
carbonTable,
@@ -342,7 +339,6 @@ object CarbonDataRDDFactory extends Logging {
hdfsStoreLocation,
kettleHomePath,
storeLocation,
- executor,
compactionModel,
lock
)
@@ -366,14 +362,14 @@ object CarbonDataRDDFactory extends Logging {
hdfsStoreLocation: String,
kettleHomePath: String,
storeLocation: String,
- executor: ExecutorService,
compactionModel: CompactionModel,
compactionLock: ICarbonLock): Unit = {
+ val executor: ExecutorService = Executors.newFixedThreadPool(1)
// update the updated table status.
readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
var segList: util.List[LoadMetadataDetails] = carbonLoadModel.getLoadMetadataDetails
- val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ var loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
hdfsStoreLocation,
carbonLoadModel,
partitioner.partitionCount,
@@ -387,49 +383,14 @@ object CarbonDataRDDFactory extends Logging {
new Thread {
override def run(): Unit = {
- val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
- CarbonCommonConstants
- .DEFAULT_COLLECTION_SIZE
- )
- breakable {
- while (true) {
-
- val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- hdfsStoreLocation,
- carbonLoadModel,
- partitioner.partitionCount,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- if (loadsToMerge.size() > 1) {
- loadsToMerge.asScala.foreach(seg => {
- logger.info("load identified for merge is " + seg.getLoadName)
- }
- )
+ while (loadsToMerge.size() > 1) {
+
+ val futureList: util.List[Future[Void]] = new util.ArrayList[Future[Void]](
+ CarbonCommonConstants
+ .DEFAULT_COLLECTION_SIZE
+ )
+ scanSegmentsAndSubmitJob(futureList)
- val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
- carbonLoadModel,
- partitioner,
- storeLocation,
- compactionModel.carbonTable,
- kettleHomePath,
- compactionModel.cubeCreationTime,
- loadsToMerge,
- sqlContext
- )
- )
- futureList.add(future)
- segList = CarbonDataMergerUtil
- .filterOutAlreadyMergedSegments(segList, loadsToMerge)
- }
- else {
- executor.shutdown()
- break
- }
- }
- }
- try {
futureList.asScala.foreach(future => {
try {
future.get
@@ -440,16 +401,71 @@ object CarbonDataRDDFactory extends Logging {
}
}
)
+ // scan again and deterrmine if anything is there to merge again.
+ readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
+ segList = carbonLoadModel.getLoadMetadataDetails
+
+ loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ hdfsStoreLocation,
+ carbonLoadModel,
+ partitioner.partitionCount,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
}
- finally {
- compactionLock.unlock
- }
+ executor.shutdown()
+ compactionLock.unlock()
}
}.start
}
else {
compactionLock.unlock()
}
+
+ /**
+ * This will scan all the segments and submit the loads to be merged into the executor.
+ * @param futureList
+ */
+ def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
+ breakable {
+ while (true) {
+
+ val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
+ hdfsStoreLocation,
+ carbonLoadModel,
+ partitioner.partitionCount,
+ compactionModel.compactionSize,
+ segList,
+ compactionModel.compactionType
+ )
+ if (loadsToMerge.size() > 1) {
+ loadsToMerge.asScala.foreach(seg => {
+ logger.info("load identified for merge is " + seg.getLoadName)
+ }
+ )
+
+ val future: Future[Void] = executor.submit(new CompactionCallable(hdfsStoreLocation,
+ carbonLoadModel,
+ partitioner,
+ storeLocation,
+ compactionModel.carbonTable,
+ kettleHomePath,
+ compactionModel.cubeCreationTime,
+ loadsToMerge,
+ sqlContext
+ )
+ )
+ futureList.add(future)
+ segList = CarbonDataMergerUtil
+ .filterOutAlreadyMergedSegments(segList, loadsToMerge)
+ }
+ else {
+ break
+ }
+ }
+ }
+ }
}
def loadCarbonData(sc: SQLContext,
@@ -473,9 +489,7 @@ object CarbonDataRDDFactory extends Logging {
.audit("Compaction request received for table " + carbonLoadModel
.getDatabaseName + "." + carbonLoadModel.getTableName
)
- val compactionSize = CarbonDataMergerUtil.getCompactionSize(CompactionType.MINOR_COMPACTION)
-
- val executor: ExecutorService = Executors.newFixedThreadPool(1)
+ val compactionSize = 0
val compactionModel = CompactionModel(compactionSize,
CompactionType.MINOR_COMPACTION,
@@ -503,7 +517,6 @@ object CarbonDataRDDFactory extends Logging {
hdfsStoreLocation,
kettleHomePath,
storeLocation,
- executor,
compactionModel,
lock
)