You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2016/08/01 10:05:37 UTC

[39/47] incubator-carbondata git commit: [CARBONDATA-121]Need to identify the segments for merging each time after one compaction (#884)

[CARBONDATA-121]Need to identify the segments for merging each time after one compaction (#884)


as the user can delete some loads in between the compaction process.
* major should not run continuously and only one 
* running the major compaction on all the segments which are present at the time of triggering the compaction.
then stopping the major compaction.

Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/2d50d5c5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/2d50d5c5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/2d50d5c5

Branch: refs/heads/master
Commit: 2d50d5c5ac7a79d397e19baaaf1d1bfa9ea18006
Parents: f495b6b
Author: ravikiran23 <ra...@gmail.com>
Authored: Fri Jul 29 14:49:03 2016 +0530
Committer: Venkata Ramana G <g....@gmail.com>
Committed: Fri Jul 29 14:49:03 2016 +0530

----------------------------------------------------------------------
 .../spark/merger/CarbonDataMergerUtil.java      |  65 ++++++----
 .../spark/rdd/CarbonDataRDDFactory.scala        |  81 ++++++------
 .../MajorCompactionStopsAfterCompaction.scala   | 125 +++++++++++++++++++
 3 files changed, 208 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
index b290dd5..c2722a4 100644
--- a/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
+++ b/integration/spark/src/main/java/org/carbondata/spark/merger/CarbonDataMergerUtil.java
@@ -233,20 +233,8 @@ public final class CarbonDataMergerUtil {
       List<LoadMetadataDetails> segments, CompactionType compactionType) {
 
     List sortedSegments = new ArrayList(segments);
-    // sort the segment details.
-    Collections.sort(sortedSegments, new Comparator<LoadMetadataDetails>() {
-      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
-        double seg1Id = Double.parseDouble(seg1.getLoadName());
-        double seg2Id = Double.parseDouble(seg2.getLoadName());
-        if (seg1Id - seg2Id < 0) {
-          return -1;
-        }
-        if (seg1Id - seg2Id > 0) {
-          return 1;
-        }
-        return 0;
-      }
-    });
+
+    sortSegments(sortedSegments);
 
     // check preserve property and preserve the configured number of latest loads.
 
@@ -273,6 +261,27 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
+   * Sorting of the segments.
+   * @param segments
+   */
+  public static void sortSegments(List segments) {
+    // sort the segment details.
+    Collections.sort(segments, new Comparator<LoadMetadataDetails>() {
+      @Override public int compare(LoadMetadataDetails seg1, LoadMetadataDetails seg2) {
+        double seg1Id = Double.parseDouble(seg1.getLoadName());
+        double seg2Id = Double.parseDouble(seg2.getLoadName());
+        if (seg1Id - seg2Id < 0) {
+          return -1;
+        }
+        if (seg1Id - seg2Id > 0) {
+          return 1;
+        }
+        return 0;
+      }
+    });
+  }
+
+  /**
    * This method will return the list of loads which are loaded at the same interval.
    * This property is configurable.
    *
@@ -691,16 +700,30 @@ public final class CarbonDataMergerUtil {
     return combinedMap;
   }
 
-  public static List<LoadMetadataDetails> filterOutAlreadyMergedSegments(
-      List<LoadMetadataDetails> segments, List<LoadMetadataDetails> loadsToMerge) {
+  /**
+   * Removing the already merged segments from list.
+   * @param segments
+   * @param loadsToMerge
+   * @return
+   */
+  public static List<LoadMetadataDetails> filterOutNewlyAddedSegments(
+      List<LoadMetadataDetails> segments, List<LoadMetadataDetails> loadsToMerge,
+      LoadMetadataDetails lastSeg) {
+
+    // take complete list of segments.
+    List<LoadMetadataDetails> list = new ArrayList<>(segments);
 
-    ArrayList<LoadMetadataDetails> list = new ArrayList<>(segments);
+    List<LoadMetadataDetails> trimmedList =
+        new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
 
-    for (LoadMetadataDetails mergedSegs : loadsToMerge) {
-      list.remove(mergedSegs);
-    }
+    // sort list
+    CarbonDataMergerUtil.sortSegments(list);
 
-    return list;
+    // first filter out newly added segments.
+    trimmedList = list.subList(0, list.indexOf(lastSeg) + 1);
+
+    return trimmedList;
 
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index ab77ea9..49e6702 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -397,6 +397,12 @@ object CarbonDataRDDFactory extends Logging {
 
     if (loadsToMerge.size() > 1) {
 
+      val sortedSegments: util.List[LoadMetadataDetails] = new util.ArrayList[LoadMetadataDetails](
+        segList
+      )
+      CarbonDataMergerUtil.sortSegments(sortedSegments)
+      val lastSegment = sortedSegments.get(sortedSegments.size()-1)
+
       new Thread {
         override def run(): Unit = {
 
@@ -408,16 +414,24 @@ object CarbonDataRDDFactory extends Logging {
                   .DEFAULT_COLLECTION_SIZE
               )
 
-              scanSegmentsAndSubmitJob(futureList)
+              scanSegmentsAndSubmitJob(futureList, loadsToMerge)
 
               futureList.asScala.foreach(future => {
                 future.get
               }
               )
-              // scan again and deterrmine if anything is there to merge again.
+
+              // scan again and determine if anything is there to merge again.
               readLoadMetadataDetails(carbonLoadModel, hdfsStoreLocation)
               segList = carbonLoadModel.getLoadMetadataDetails
+              // in case of major compaction we will scan only once and come out as it will keep
+              // on doing major for the new loads also.
+              // excluding the newly added segments.
+              if (compactionModel.compactionType == CompactionType.MAJOR_COMPACTION) {
 
+                segList = CarbonDataMergerUtil
+                  .filterOutNewlyAddedSegments(segList, loadsToMerge, lastSegment)
+              }
               loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
                 hdfsStoreLocation,
                 carbonLoadModel,
@@ -446,52 +460,35 @@ object CarbonDataRDDFactory extends Logging {
     }
 
     /**
-     * This will scan all the segments and submit the loads to be merged into the executor.
+     * This will submit the loads to be merged into the executor.
       *
       * @param futureList
      */
-    def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]]): Unit = {
-      breakable {
-        while (true) {
+    def scanSegmentsAndSubmitJob(futureList: util.List[Future[Void]], loadsToMerge: util
+    .List[LoadMetadataDetails]): Unit = {
 
-          val loadsToMerge = CarbonDataMergerUtil.identifySegmentsToBeMerged(
-            hdfsStoreLocation,
-            carbonLoadModel,
-            partitioner.partitionCount,
-            compactionModel.compactionSize,
-            segList,
-            compactionModel.compactionType
-          )
-          if (loadsToMerge.size() > 1) {
-            loadsToMerge.asScala.foreach(seg => {
-              logger.info("load identified for merge is " + seg.getLoadName)
-            }
-            )
+      loadsToMerge.asScala.foreach(seg => {
+        logger.info("loads identified for merge is " + seg.getLoadName)
+      }
+      )
 
-            val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
-              carbonLoadModel,
-              partitioner,
-              storeLocation,
-              compactionModel.carbonTable,
-              kettleHomePath,
-              compactionModel.cubeCreationTime,
-              loadsToMerge,
-              sqlContext,
-              compactionModel.compactionType)
+      val compactionCallableModel = CompactionCallableModel(hdfsStoreLocation,
+        carbonLoadModel,
+        partitioner,
+        storeLocation,
+        compactionModel.carbonTable,
+        kettleHomePath,
+        compactionModel.cubeCreationTime,
+        loadsToMerge,
+        sqlContext,
+        compactionModel.compactionType
+      )
 
-            val future: Future[Void] = executor
-              .submit(new CompactionCallable(compactionCallableModel
-              )
-              )
-            futureList.add(future)
-            segList = CarbonDataMergerUtil
-              .filterOutAlreadyMergedSegments(segList, loadsToMerge)
-          }
-          else {
-            break
-          }
-        }
-      }
+      val future: Future[Void] = executor
+        .submit(new CompactionCallable(compactionCallableModel
+        )
+        )
+      futureList.add(future)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/2d50d5c5/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
new file mode 100644
index 0000000..3cbf5dd
--- /dev/null
+++ b/integration/spark/src/test/scala/org/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -0,0 +1,125 @@
+package org.carbondata.spark.testsuite.datacompaction
+
+import java.io.File
+
+import org.apache.spark.sql.common.util.CarbonHiveContext._
+import org.apache.spark.sql.common.util.QueryTest
+import org.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
+import org.carbondata.core.constants.CarbonCommonConstants
+import org.carbondata.core.util.CarbonProperties
+import org.carbondata.lcm.status.SegmentStatusManager
+import org.scalatest.BeforeAndAfterAll
+
+import scala.collection.JavaConverters._
+
+/**
+  * FT for compaction scenario where major compaction will only compact the segments which are
+  * present at the time of triggering the compaction.
+  */
+class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists  stopmajor")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    sql(
+      "CREATE TABLE IF NOT EXISTS stopmajor (country String, ID Int, date Timestamp, name " +
+        "String, " +
+        "phonetype String, serialname String, salary Int) STORED BY 'org.apache.carbondata" +
+        ".format'"
+    )
+
+
+    val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")
+      .getCanonicalPath
+    val csvFilePath1 = currentDirectory + "/src/test/resources/compaction/compaction1.csv"
+
+    val csvFilePath2 = currentDirectory + "/src/test/resources/compaction/compaction2.csv"
+    val csvFilePath3 = currentDirectory + "/src/test/resources/compaction/compaction3.csv"
+
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    // compaction will happen here.
+    sql("alter table stopmajor compact 'major'"
+    )
+    Thread.sleep(2000)
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE stopmajor OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    sql("LOAD DATA LOCAL INPATH '" + csvFilePath2 + "' INTO TABLE stopmajor  OPTIONS" +
+      "('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+    )
+    if (checkCompactionCompletedOrNot("0.1")) {
+    }
+
+  }
+
+  /**
+    * Check if the compaction is completed or not.
+    *
+    * @param requiredSeg
+    * @return
+    */
+  def checkCompactionCompletedOrNot(requiredSeg: String): Boolean = {
+    var status = false
+    var noOfRetries = 0
+    while (!status && noOfRetries < 10) {
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+          AbsoluteTableIdentifier(
+            CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+            new CarbonTableIdentifier("default", "stopmajor", noOfRetries + "")
+          )
+      )
+      val segments = segmentStatusManager.getValidSegments().listOfValidSegments.asScala.toList
+      segments.foreach(seg =>
+        System.out.println( "valid segment is =" + seg)
+      )
+
+      if (!segments.contains(requiredSeg)) {
+        // wait for 2 seconds for compaction to complete.
+        System.out.println("sleping for 2 seconds.")
+        Thread.sleep(2000)
+        noOfRetries += 1
+      }
+      else {
+        status = true
+      }
+    }
+    return status
+  }
+
+  /**
+    * Test whether major compaction is not included in minor compaction.
+    */
+  test("delete merged folder and check segments") {
+    // delete merged segments
+    sql("clean files for table stopmajor")
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+        AbsoluteTableIdentifier(
+          CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
+          new CarbonTableIdentifier("default", "stopmajor", "rrr")
+        )
+    )
+    // merged segment should not be there
+    val segments = segmentStatusManager.getValidSegments.listOfValidSegments.asScala.toList
+    assert(segments.contains("0.1"))
+    assert(!segments.contains("0.2"))
+    assert(!segments.contains("0"))
+    assert(!segments.contains("1"))
+    assert(segments.contains("2"))
+    assert(segments.contains("3"))
+
+  }
+
+  override def afterAll {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+  }
+
+}