You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:37 UTC
[39/47] incubator-carbondata git commit: [CARBONDATA-121]Need to
identify the segments for merging each time after one compaction (#884)
[CARBONDATA-121]Need to identify the segments for merging each time after one compaction (#884)
as the user can delete some loads in between the compaction process.
* major should not run continuously and only one
* running the major compaction on all the segments which are present at the time of triggering the compaction.
then stopping the major compaction.
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2d50d5c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2d50d5c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2d50d5c5
Branch: refs/heads/master
Commit: 2d50d5c5ac7a79d397e19baaaf1d1bfa9ea18006
Parents: f495b6b
Author: ravikiran23 <ra...@gmail.com>
Authored: Fri Jul 29 14:49:03 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 14:49:03 2016 +0530
----------------------------------------------------------------------
.../spark/merger/CarbonDataMergerUtil.java | 65 ++++++----
.../spark/rdd/CarbonDataRDDFactory.scala | 81 ++++++------
.../MajorCompactionStopsAfterCompaction.scala | 125 +++++++++++++++++++
3 files changed, 208 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index b290dd5..c2722a4 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -233,20 +233,8 @@ public final class CarbonDataMergerUtil {
List<LoadMetadataDetails> segments, CompactionType compactionType) {
List sortedSegments = new ArrayList(segments);
- // sort the segment details.
- Collections.sort(sortedSegments, new Comparator<LoadMetadataDetails>() {
- @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
- double seg1Id = Double.parseDouble(seg1.getLoadName());
- double seg2Id = Double.parseDouble(seg2.getLoadName());
- if (seg1Id - seg2Id < 0) {
- return -1;
- }
- if (seg1Id - seg2Id > 0) {
- return 1;
- }
- return 0;
- }
- });
+
+ sortSegments(sortedSegments);
// check preserve property and preserve the configured number of latest loads.
@@ -273,6 +261,27 @@ public final class CarbonDataMergerUtil {
}
/**
+ * Sorting of the segments.
+ * @param segments
+ */
+ public static void sortSegments(List segments) {
+ // sort the segment details.
+ Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
+ @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+ double seg1Id = Double.parseDouble(seg1.getLoadName());
+ double seg2Id = Double.parseDouble(seg2.getLoadName());
+ if (seg1Id - seg2Id < 0) {
+ return -1;
+ }
+ if (seg1Id - seg2Id > 0) {
+ return 1;
+ }
+ return 0;
+ }
+ });
+ }
+
+ /**
* This method will return the list of loads which are loaded at the same interval.
* This property is configurable.
*
@@ -691,16 +700,30 @@ public final class CarbonDataMergerUtil {
return combinedMap;
}
- public static List<LoadMetadataDetails> filterOutAlreadyMergedSegments(
- List<LoadMetadataDetails> segments, List<LoadMetadataDetails> loadsToMerge) {
+ /**
+ * Removing the already merged segments from list.
+ * @param segments
+ * @param loadsToMerge
+ * @return
+ */
+ public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
+ List<LoadMetadataDetails> segments, List<LoadMetadataDetails> loadsToMerge,
+ LoadMetadataDetails lastSeg) {
+
+ // take complete list of segments.
+ List<LoadMetadataDetails> list = new ArrayList<>(segments);
- ArrayList<LoadMetadataDetails> list = new ArrayList<>(segments);
+ List<LoadMetadataDetails> trimmedList =
+ new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
- for (LoadMetadataDetails mergedSegs : loadsToMerge) {
- list.remove(mergedSegs);
- }
+ // sort list
+ CarbonDataMergerUtil.sortSegments(list);
- return list;
+ // first filter out newly added segments.
+ trimmedList = list.subList(0, list.indexOf(lastSeg) + 1);
+
+ return trimmedList;
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ab77ea9..49e6702 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -397,6 +397,12 @@ object CarbonDataRDDFactory extends Logging {
if (loadsToMerge.size() > 1) {
+ val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+ segList
+ )
+ CarbonDataMergerUtil.sortSegments(sortedSegments)
+ val lastSegment = sortedSegments.get(sortedSegments.size()-1)
+
new Thread {
override def run(): Unit = {
@@ -408,16 +414,24 @@ object CarbonDataRDDFactory extends Logging {
.DEFAULT_COLLECTION_SIZE
)
- scanSegmentsAndSubmitJob(futureList)
+ scanSegmentsAndSubmitJob(futureList, loadsToMerge)
futureList.asScala.foreach(future => {
future.get
}
)
- // scan again and deterrmine if anything is there to merge again.
+
+ // scan again and determine if anything is there to merge again.
readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
segList = carbonLoadModel.getLoadMetadataDetails
+ // in case of major compaction we will scan only once and come out as it will keep
+ // on doing major for the new loads also.
+ // excluding the newly added segments.
+ if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
+ segList = CarbonDataMergerUtil
+ .filterOutNewlyAddedSegments(segList, loadsToMerge, lastSegment)
+ }
loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
hdfsStoreLocation,
carbonLoadModel,
@@ -446,52 +460,35 @@ object CarbonDataRDDFactory extends Logging {
}
/**
- * This will scan all the segments and submit the loads to be merged into the executor.
+ * This will submit the loads to be merged into the executor.
*
* @param futureList
*/
- def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
- breakable {
- while (true) {
+ def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util
+ .List[LoadMetadataDetails]): Unit = {
- val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
- hdfsStoreLocation,
- carbonLoadModel,
- partitioner.partitionCount,
- compactionModel.compactionSize,
- segList,
- compactionModel.compactionType
- )
- if (loadsToMerge.size() > 1) {
- loadsToMerge.asScala.foreach(seg => {
- logger.info("load identified for merge is " + seg.getLoadName)
- }
- )
+ loadsToMerge.asScala.foreach(seg => {
+ logger.info("loads identified for merge is " + seg.getLoadName)
+ }
+ )
- val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
- carbonLoadModel,
- partitioner,
- storeLocation,
- compactionModel.carbonTable,
- kettleHomePath,
- compactionModel.cubeCreationTime,
- loadsToMerge,
- sqlContext,
- compactionModel.compactionType)
+ val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
+ carbonLoadModel,
+ partitioner,
+ storeLocation,
+ compactionModel.carbonTable,
+ kettleHomePath,
+ compactionModel.cubeCreationTime,
+ loadsToMerge,
+ sqlContext,
+ compactionModel.compactionType
+ )
- val future: Future[Void] = executor
- .submit(new CompactionCallable(compactionCallableModel
- )
- )
- futureList.add(future)
- segList = CarbonDataMergerUtil
- .filterOutAlreadyMergedSegments(segList, loadsToMerge)
- }
- else {
- break
- }
- }
- }
+ val future: Future[Void] = executor
+ .submit(new CompactionCallable(compactionCallableModel
+ )
+ )
+ futureList.add(future)
}
}
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
new file mode 100644
index 0000000..3cbf5dd
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -0,0 +1,125 @@
+package org.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.lcm.status.SegmentStatusManager
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+ * FT for compaction scenario where major compaction will only compact the segments which are
+ * present at the time of triggering the compaction.
+ */
+class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("drop table if exists stopmajor")
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+ sql(
+ "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID Int, date Timestamp, name " +
+ "String, " +
+ "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+ ".format'"
+ )
+
+
+ val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+ .getCanonicalPath
+ val csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+ val csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+ val csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ // compaction will happen here.
+ sql("alter table stopmajor compact 'major'"
+ )
+ Thread.sleep(2000)
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor OPTIONS" +
+ "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+ )
+ if (checkCompactionCompletedOrNot("0.1")) {
+ }
+
+ }
+
+ /**
+ * Check if the compaction is completed or not.
+ *
+ * @param requiredSeg
+ * @return
+ */
+ def checkCompactionCompletedOrNot(requiredSeg: String): Boolean = {
+ var status = false
+ var noOfRetries = 0
+ while (!status && noOfRetries < 10) {
+
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+ AbsoluteTableIdentifier(
+ CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+ new CarbonTableIdentifier("default", "stopmajor", noOfRetries + "")
+ )
+ )
+ val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+ segments.foreach(seg =>
+ System.out.println( "valid segment is =" + seg)
+ )
+
+ if (!segments.contains(requiredSeg)) {
+ // wait for 2 seconds for compaction to complete.
+ System.out.println("sleping for 2 seconds.")
+ Thread.sleep(2000)
+ noOfRetries += 1
+ }
+ else {
+ status = true
+ }
+ }
+ return status
+ }
+
+ /**
+ * Test whether major compaction is not included in minor compaction.
+ */
+ test("delete merged folder and check segments") {
+ // delete merged segments
+ sql("clean files for table stopmajor")
+
+ val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+ AbsoluteTableIdentifier(
+ CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+ new CarbonTableIdentifier("default", "stopmajor", "rrr")
+ )
+ )
+ // merged segment should not be there
+ val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+ assert(segments.contains("0.1"))
+ assert(!segments.contains("0.2"))
+ assert(!segments.contains("0"))
+ assert(!segments.contains("1"))
+ assert(segments.contains("2"))
+ assert(segments.contains("3"))
+
+ }
+
+ override def afterAll {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+ }
+
+}