You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/12/01 17:58:12 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4020: [CARBONDATA-4054] Support data size control for minor compaction

akashrn5 commented on a change in pull request #4020:
URL: https://github.com/apache/carbondata/pull/4020#discussion_r533604921



##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -998,6 +998,34 @@ public long getMajorCompactionSize() {
     return compactionSize;
   }
 
+  /**
+   * returns minor compaction size value from carbon properties or -1 if it is not valid or
+   * not configured
+   *
+   * @return compactionSize
+   */
+  public long getMinorCompactionSize() {
+    long compactionSize = -1;
+    // if not configured, just use default -1
+    if (null != getProperty(CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE)) {
+      try {
+        compactionSize = Long.parseLong(getProperty(
+                CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Invalid value is configured for property "
+                + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + " considering the default"
+                + " value -1 and not considering segment Size during minor compaction.");
+      }
+      if (compactionSize <= 0) {
+        LOGGER.warn("Invalid value is configured for property "
+                + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + " considering the default"

Review comment:
       same as above

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -998,6 +998,34 @@ public long getMajorCompactionSize() {
     return compactionSize;
   }
 
+  /**
+   * returns minor compaction size value from carbon properties or -1 if it is not valid or
+   * not configured
+   *
+   * @return compactionSize
+   */
+  public long getMinorCompactionSize() {
+    long compactionSize = -1;
+    // if not configured, just use default -1
+    if (null != getProperty(CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE)) {
+      try {
+        compactionSize = Long.parseLong(getProperty(
+                CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Invalid value is configured for property "
+                + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + " considering the default"

Review comment:
       ```suggestion
                   + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + ", considering the default"
   ```

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)
+    // check segment 3 whose size not exceed the limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset to 0
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "for partition table") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+    sql("drop table if exists  tmp")
+    sql("drop table if exists  minor_threshold_partition")
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // set threshold to 1MB for partition table
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold_partition (country String, ID Int," +
+        " date Timestamp, name String, serialname String, salary Int) PARTITIONED BY " +
+        "(phonetype string) STORED AS carbondata TBLPROPERTIES('minor_compaction_size'='1')"
+    )
+    sql("desc formatted minor_threshold_partition").show(100, false)
+
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold_partition select country, ID, date, name, serialname," +
+      " salary, phonetype from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction for minor_threshold_partition
+    sql("alter table minor_threshold_partition compact 'minor'"
+    )

Review comment:
       move this above

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)
+    // check segment 3 whose size not exceed the limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset to 0
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "for partition table") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+    sql("drop table if exists  tmp")
+    sql("drop table if exists  minor_threshold_partition")
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // set threshold to 1MB for partition table
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold_partition (country String, ID Int," +
+        " date Timestamp, name String, serialname String, salary Int) PARTITIONED BY " +
+        "(phonetype string) STORED AS carbondata TBLPROPERTIES('minor_compaction_size'='1')"
+    )
+    sql("desc formatted minor_threshold_partition").show(100, false)
+
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold_partition select country, ID, date, name, serialname," +
+      " salary, phonetype from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction for minor_threshold_partition
+    sql("alter table minor_threshold_partition compact 'minor'"
+    )
+    sql("show segments for table minor_threshold_partition"
+    ).show(100, false)
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold_partition")
+    val carbonTablePath2 = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath2);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from " +
+      "minor_threshold_partition").collect().head.get(0))

Review comment:
       same comment for carbon property, like above

##########
File path: integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
##########
@@ -292,6 +293,29 @@ object CommonUtil {
     }
   }
 
+  def validateMinorCompactionSize(tableProperties: Map[String, String]): Unit = {
+    var minorCompactionSize: Integer = 0
+    val minorCompactionSizePropName = CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE
+    if (tableProperties.get(minorCompactionSizePropName).isDefined) {

Review comment:
       replace with contains

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)

Review comment:
       remove this

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)
+    // check segment 3 whose size not exceed the limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset to 0
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")

Review comment:
       after test case, please revert the set carbon property to default one to avoid the impact to other tests in case of failures

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)

Review comment:
       remove this

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )

Review comment:
       move this line above

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)
+    // check segment 3 whose size not exceed the limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)

Review comment:
       also check for success and compacted both, handle above also

##########
File path: integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
##########
@@ -186,6 +187,141 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    sql("show segments for table minor_threshold").show(100, false)
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'"
+    )
+    sql("show segments for table minor_threshold").show(100, false)
+    // check segment 3 whose size not exceed the limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset to 0
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "for partition table") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+    sql("drop table if exists  tmp")
+    sql("drop table if exists  minor_threshold_partition")
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // set threshold to 1MB for partition table
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold_partition (country String, ID Int," +
+        " date Timestamp, name String, serialname String, salary Int) PARTITIONED BY " +
+        "(phonetype string) STORED AS carbondata TBLPROPERTIES('minor_compaction_size'='1')"
+    )
+    sql("desc formatted minor_threshold_partition").show(100, false)
+
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold_partition select country, ID, date, name, serialname," +
+      " salary, phonetype from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction for minor_threshold_partition
+    sql("alter table minor_threshold_partition compact 'minor'"
+    )
+    sql("show segments for table minor_threshold_partition"

Review comment:
       remove this




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org