You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/12/04 01:58:37 UTC

[carbondata] branch master updated: [CARBONDATA-4054] Support data size control for minor compaction

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new e1ce3fa  [CARBONDATA-4054] Support data size control for minor compaction
e1ce3fa is described below

commit e1ce3fa517d584c778ce7c066d5a520e3233e9de
Author: Zhangshunyu <zh...@126.com>
AuthorDate: Tue Nov 24 11:53:16 2020 +0800

    [CARBONDATA-4054] Support data size control for minor compaction
    
    Why is this PR needed?
    Currentlly, minor compaction only consider the num of segments and major
    compaction only consider the SUM size of segments, but consider a scenario
    that the user want to use minor compaction by the num of segments but he
    dont want to merge the segment whose datasize larger the threshold for
    example 2GB, as it is no need to merge so much big segment and it is time
    costly.
    
    What changes were proposed in this PR?
    add a parameter to control the threshold of segment included
    in minor compaction, so that the user can specify the segment not included
    in minor compaction once the datasize exeed the threshold, system level and table level can be set, and if not set the use default
    value.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4020
---
 .../core/constants/CarbonCommonConstants.java      |   8 ++
 .../carbondata/core/util/CarbonProperties.java     |  28 ++++
 .../apache/carbondata/core/util/SessionParams.java |   1 +
 docs/ddl-of-carbondata.md                          |   4 +-
 docs/dml-of-carbondata.md                          |   5 +
 .../apache/carbondata/spark/util/CommonUtil.scala  |  24 ++++
 .../table/CarbonDescribeFormattedCommand.scala     |   4 +
 .../org/apache/spark/util/AlterTableUtil.scala     |   2 +
 .../MajorCompactionIgnoreInMinorTest.scala         | 141 +++++++++++++++++++++
 .../processing/merger/CarbonDataMergerUtil.java    |  54 ++++++--
 10 files changed, 257 insertions(+), 14 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2def2f3..57fbfe0 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -495,6 +495,8 @@ public final class CarbonCommonConstants {
   public static final String INVERTED_INDEX = "inverted_index";
   // table property name of major compaction size
   public static final String TABLE_MAJOR_COMPACTION_SIZE = "major_compaction_size";
+  // table property name of minor compaction size
+  public static final String TABLE_MINOR_COMPACTION_SIZE = "minor_compaction_size";
   // table property name of auto load merge
   public static final String TABLE_AUTO_LOAD_MERGE = "auto_load_merge";
   // table property name of compaction level threshold
@@ -737,6 +739,12 @@ public final class CarbonCommonConstants {
   public static final String CARBON_MAJOR_COMPACTION_SIZE = "carbon.major.compaction.size";
 
   /**
+   * Size of Minor Compaction in MBs
+   */
+  @CarbonProperty(dynamicConfigurable = true)
+  public static final String CARBON_MINOR_COMPACTION_SIZE = "carbon.minor.compaction.size";
+
+  /**
    * By default size of major compaction in MBs.
    */
   public static final String DEFAULT_CARBON_MAJOR_COMPACTION_SIZE = "1024";
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
index e2b4e08..565a00b 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
@@ -1003,6 +1003,34 @@ public final class CarbonProperties {
   }
 
   /**
+   * returns minor compaction size value from carbon properties or -1 if it is not valid or
+   * not configured
+   *
+   * @return compactionSize
+   */
+  public long getMinorCompactionSize() {
+    long compactionSize = -1;
+    // if not configured, just use default -1
+    if (null != getProperty(CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE)) {
+      try {
+        compactionSize = Long.parseLong(getProperty(
+                CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE));
+      } catch (NumberFormatException e) {
+        LOGGER.warn("Invalid value is configured for property "
+                + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + ", considering the default"
+                + " value -1 and not considering segment Size during minor compaction.");
+      }
+      if (compactionSize <= 0) {
+        LOGGER.warn("Invalid value is configured for property "
+                + CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE + ", considering the default"
+                + " value -1 and not considering segment Size during minor compaction.");
+        compactionSize = -1;
+      }
+    }
+    return compactionSize;
+  }
+
+  /**
    * returns the number of loads to be preserved.
    *
    * @return
diff --git a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
index c611256..aa9abfe 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/SessionParams.java
@@ -171,6 +171,7 @@ public class SessionParams implements Serializable, Cloneable {
       case NUM_CORES_COMPACTING:
       case BLOCKLET_SIZE_IN_MB:
       case CARBON_MAJOR_COMPACTION_SIZE:
+      case CARBON_MINOR_COMPACTION_SIZE:
         isValid = CarbonUtil.validateValidIntType(value);
         if (!isValid) {
           throw new InvalidConfigurationException(
diff --git a/docs/ddl-of-carbondata.md b/docs/ddl-of-carbondata.md
index 208baca..ee208c4 100644
--- a/docs/ddl-of-carbondata.md
+++ b/docs/ddl-of-carbondata.md
@@ -285,9 +285,10 @@ CarbonData DDL statements are documented here,which includes:
    - ##### Table Compaction Configuration
    
      These properties are table level compaction configurations, if not specified, system level configurations in carbon.properties will be used.
-     Following are 5 configurations:
+     Following are 6 configurations:
      
      * MAJOR_COMPACTION_SIZE: same meaning as carbon.major.compaction.size, size in MB.
+     * MINOR_COMPACTION_SIZE: same meaning as carbon.minor.compaction.size, size in MB.
      * AUTO_LOAD_MERGE: same meaning as carbon.enable.auto.load.merge.
      * COMPACTION_LEVEL_THRESHOLD: same meaning as carbon.compaction.level.threshold.
      * COMPACTION_PRESERVE_SEGMENTS: same meaning as carbon.numberof.preserve.segments.
@@ -295,6 +296,7 @@ CarbonData DDL statements are documented here,which includes:
 
      ```
      TBLPROPERTIES ('MAJOR_COMPACTION_SIZE'='2048',
+                    'MINOR_COMPACTION_SIZE'='4096',
                     'AUTO_LOAD_MERGE'='true',
                     'COMPACTION_LEVEL_THRESHOLD'='5,6',
                     'COMPACTION_PRESERVE_SEGMENTS'='10',
diff --git a/docs/dml-of-carbondata.md b/docs/dml-of-carbondata.md
index 405d7c8..3f456c4 100644
--- a/docs/dml-of-carbondata.md
+++ b/docs/dml-of-carbondata.md
@@ -530,6 +530,11 @@ CarbonData DML statements are documented here,which includes:
   * Level 1: Merging of the segments which are not yet compacted.
   * Level 2: Merging of the compacted segments again to form a larger segment.
 
+  The segment whose data size exceed the limit of carbon.minor.compaction.size will not be included
+  in minor compaction. User can control the size of a segment to be included in the minor
+  compaction by using carbon.minor.compaction.size. If not configured, minor compaction will
+  consider the segments based on carbon.compaction.level.threshold by neglecting the size of
+  each segment.
   ```
   ALTER TABLE table_name COMPACT 'MINOR'
   ```
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 431df5f..af23001 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -257,6 +257,7 @@ object CommonUtil {
    */
   def validateTableLevelCompactionProperties(tableProperties: Map[String, String]): Unit = {
     validateMajorCompactionSize(tableProperties)
+    validateMinorCompactionSize(tableProperties)
     validateAutoLoadMerge(tableProperties)
     validateCompactionLevelThreshold(tableProperties)
     validateCompactionPreserveSegmentsOrAllowedDays(tableProperties,
@@ -292,6 +293,29 @@ object CommonUtil {
     }
   }
 
+  def validateMinorCompactionSize(tableProperties: Map[String, String]): Unit = {
+    var minorCompactionSize: Integer = 0
+    val minorCompactionSizePropName = CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE
+    if (tableProperties.contains(minorCompactionSizePropName)) {
+      val minorCompactionSizeStr: String =
+        parsePropertyValueStringInMB(tableProperties(minorCompactionSizePropName))
+      try {
+        minorCompactionSize = Integer.parseInt(minorCompactionSizeStr)
+      } catch {
+        case e: NumberFormatException =>
+          throw new MalformedCarbonCommandException(s"Invalid value $minorCompactionSizeStr" +
+            s" configured for $minorCompactionSizePropName. Please consider configuring value" +
+            s" greater than 0")
+      }
+      if (minorCompactionSize <= 0) {
+        throw new MalformedCarbonCommandException(s"Invalid value $minorCompactionSizeStr" +
+          s" configured for $minorCompactionSizePropName. Please consider configuring value" +
+          s" greater than 0")
+      }
+      tableProperties.put(minorCompactionSizePropName, minorCompactionSizeStr)
+    }
+  }
+
   private def validateBoolean(tableProperties: Map[String, String], property: String): Unit = {
     if (tableProperties.get(property).isDefined) {
       val trimStr = tableProperties(property).trim
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
index 5e927a3..477003a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDescribeFormattedCommand.scala
@@ -191,6 +191,10 @@ private[sql] case class CarbonDescribeFormattedCommand(
           CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.CARBON_MAJOR_COMPACTION_SIZE,
               CarbonCommonConstants.DEFAULT_CARBON_MAJOR_COMPACTION_SIZE)), ""),
+      (CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE.toUpperCase,
+        tblProps.getOrElse(CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE,
+          CarbonProperties.getInstance()
+            .getProperty(CarbonCommonConstants.CARBON_MINOR_COMPACTION_SIZE, "0")), ""),
       (CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE.toUpperCase,
         tblProps.getOrElse(CarbonCommonConstants.TABLE_AUTO_LOAD_MERGE,
           CarbonProperties.getInstance()
diff --git a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
index d9afc24..07b7dba 100644
--- a/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/util/AlterTableUtil.scala
@@ -547,6 +547,8 @@ object AlterTableUtil {
       "RANGE_COLUMN",
       "SORT_SCOPE",
       "SORT_COLUMNS",
+      "MINOR_COMPACTION_SIZE",
+      "MAJOR_COMPACTION_SIZE",
       "GLOBAL_SORT_PARTITIONS",
       "LONG_STRING_COLUMNS",
       "INDEX_CACHE_EXPIRATION_SECONDS",
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 038e79a..288842b 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -18,6 +18,7 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import scala.collection.JavaConverters._
 
+import org.apache.spark.sql.{DataFrame, SaveMode}
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -186,6 +187,146 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
 
   }
 
+  def generateData(numOrders: Int = 100000): DataFrame = {
+    import sqlContext.implicits._
+    sqlContext.sparkContext.parallelize(1 to numOrders, 4)
+      .map { x => ("country" + x, x, "07/23/2015", "name" + x, "phonetype" + x % 10,
+        "serialname" + x, x + 10000)
+      }.toDF("country", "ID", "date", "name", "phonetype", "serialname", "salary")
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "in system level control and table level") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+    // set threshold to 1MB in system level
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "1")
+
+    sql("drop table if exists  minor_threshold")
+    sql("drop table if exists  tmp")
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold (country String, ID Int, date" +
+        " Timestamp, name String, phonetype String, serialname String, salary Int) " +
+        "STORED AS carbondata"
+    )
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold select * from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold" +
+        " OPTIONS ('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'")
+    // check segment 3 whose size exceed the limit should not be compacted but success
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold")
+    val carbonTablePath = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // change the threshold to 5MB by dynamic table properties setting, then the segment whose id is
+    // 3 should be included in minor compaction
+    sql("alter table minor_threshold set TBLPROPERTIES('minor_compaction_size'='5')")
+    // reload some segments
+    for (i <- 0 to 2) {
+      sql("insert into minor_threshold select * from tmp")
+    }
+    // do minor compaction
+    sql("alter table minor_threshold compact 'minor'")
+    // check segment 3 whose size not exceed the new threshold limit should be compacted now
+    val segments2 = SegmentStatusManager.readLoadMetadata(carbonTablePath);
+    assertResult(SegmentStatus.COMPACTED)(segments2(3).getSegmentStatus)
+    assertResult(400030)(sql("select count(*) from minor_threshold").collect().head.get(0))
+
+    // reset the properties
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+  test("test skip segment whose data size exceed threshold in minor compaction " +
+    "for partition table") {
+    CarbonProperties.getInstance().addProperty("carbon.compaction.level.threshold", "2,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "mm/dd/yyyy")
+
+    sql("drop table if exists  tmp")
+    sql("drop table if exists  minor_threshold_partition")
+    sql(
+      "CREATE TABLE IF NOT EXISTS tmp (country String, ID Int, date Timestamp," +
+        " name String, phonetype String, serialname String, salary Int) STORED AS carbondata"
+    )
+    val initframe = generateData(100000)
+    initframe.write
+      .format("carbondata")
+      .option("tablename", "tmp")
+      .mode(SaveMode.Overwrite)
+      .save()
+    // set threshold to 1MB for partition table
+    sql(
+      "CREATE TABLE IF NOT EXISTS minor_threshold_partition (country String, ID Int," +
+        " date Timestamp, name String, serialname String, salary Int) PARTITIONED BY " +
+        "(phonetype string) STORED AS carbondata TBLPROPERTIES('minor_compaction_size'='1')"
+    )
+    // load 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // insert a new segment(id is 3) data size exceed 1 MB
+    sql("insert into minor_threshold_partition select country, ID, date, name, serialname," +
+      " salary, phonetype from tmp")
+    // load another 3 segments
+    for (i <- 0 to 2) {
+      sql("LOAD DATA LOCAL INPATH '" + csvFilePath1 + "' INTO TABLE minor_threshold_partition" +
+        " OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '\"')"
+      )
+    }
+    // do minor compaction for minor_threshold_partition
+    sql("alter table minor_threshold_partition compact 'minor'")
+    // check segment 3 whose size exceed the limit should not be compacted
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(
+      CarbonCommonConstants.DATABASE_DEFAULT_NAME, "minor_threshold_partition")
+    val carbonTablePath2 = carbonTable.getMetadataPath
+    val segments = SegmentStatusManager.readLoadMetadata(carbonTablePath2);
+    assertResult(SegmentStatus.SUCCESS)(segments(3).getSegmentStatus)
+    assertResult(100030)(sql("select count(*) from " +
+      "minor_threshold_partition").collect().head.get(0))
+    // reset the properties
+    CarbonProperties.getInstance().addProperty("carbon.minor.compaction.size", "-1")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+        CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD,
+        CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
   override def afterAll {
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 0a2d104..29f629e 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -311,10 +311,8 @@ public final class CarbonDataMergerUtil {
       listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSize(compactionSize,
               listOfSegmentsLoadedInSameDateInterval, carbonLoadModel);
     } else {
-
-      listOfSegmentsToBeMerged =
-              identifySegmentsToBeMergedBasedOnSegCount(listOfSegmentsLoadedInSameDateInterval,
-                      tableLevelProperties);
+      listOfSegmentsToBeMerged = identifySegmentsToBeMergedBasedOnSegCount(compactionSize,
+              listOfSegmentsLoadedInSameDateInterval, tableLevelProperties, carbonLoadModel);
     }
 
     return listOfSegmentsToBeMerged;
@@ -595,7 +593,8 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
-   * Identify the segments to be merged based on the segment count
+   * Identify the segments to be merged based on the segment count, the segment whose data size
+   * exceed minor compaction size threshold will not be compacted.
    *
    * @param listOfSegmentsAfterPreserve the list of segments after
    *        preserve and before filtering by minor compaction level
@@ -603,7 +602,8 @@ public final class CarbonDataMergerUtil {
    * @return the list of segments to be merged after filtering by minor compaction level
    */
   private static List<LoadMetadataDetails> identifySegmentsToBeMergedBasedOnSegCount(
-          List<LoadMetadataDetails> listOfSegmentsAfterPreserve, Map<String, String> tblProps) {
+          long compactionSize, List<LoadMetadataDetails> listOfSegmentsAfterPreserve,
+          Map<String, String> tblProps, CarbonLoadModel carbonLoadModel) throws IOException {
 
     List<LoadMetadataDetails> mergedSegments =
             new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
@@ -641,12 +641,29 @@ public final class CarbonDataMergerUtil {
 
     int unMergeCounter = 0;
     int mergeCounter = 0;
-
+    CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable();
     // check size of each segment , sum it up across partitions
     for (LoadMetadataDetails segment : listOfSegmentsAfterPreserve) {
-      // compaction should skip streaming segments
+      long sizeOfOneSegmentAcrossPartition;
+      if (segment.getSegmentFile() != null) {
+        // If LoadMetaDataDetail already has data size no need to calculate the data size from
+        // index files. If not there then read the index file and calculate size.
+        if (!StringUtils.isEmpty(segment.getDataSize())) {
+          sizeOfOneSegmentAcrossPartition = Long.parseLong(segment.getDataSize());
+        } else {
+          sizeOfOneSegmentAcrossPartition = CarbonUtil.getSizeOfSegment(carbonTable.getTablePath(),
+                  new Segment(segment.getLoadName(), segment.getSegmentFile()));
+        }
+      } else {
+        sizeOfOneSegmentAcrossPartition =
+                getSizeOfSegment(carbonTable.getTablePath(), segment.getLoadName());
+      }
+      // compaction should skip streaming segments and segments whose data size exceed minor
+      // compaction size threshold, but if compactionSize is -1 which means not consider the
+      // segment size.
       if (segment.getSegmentStatus() == SegmentStatus.STREAMING ||
-          segment.getSegmentStatus() == SegmentStatus.STREAMING_FINISH) {
+          segment.getSegmentStatus() == SegmentStatus.STREAMING_FINISH || (compactionSize > 0 &&
+          sizeOfOneSegmentAcrossPartition / (1024 * 1024) >= compactionSize)) {
         continue;
       }
       String segName = segment.getLoadName();
@@ -767,16 +784,27 @@ public final class CarbonDataMergerUtil {
   public static long getCompactionSize(CompactionType compactionType,
                                        CarbonLoadModel carbonLoadModel) {
     long compactionSize = 0;
+    Map<String, String> tblProps = carbonLoadModel.getCarbonDataLoadSchema()
+            .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
     switch (compactionType) {
       case MAJOR:
-        // default value is system level option
-        compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
         // if table level option is identified, use it to overwrite system level option
-        Map<String, String> tblProps = carbonLoadModel.getCarbonDataLoadSchema()
-                .getCarbonTable().getTableInfo().getFactTable().getTableProperties();
         if (tblProps.containsKey(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE)) {
           compactionSize = Long.parseLong(
                   tblProps.get(CarbonCommonConstants.TABLE_MAJOR_COMPACTION_SIZE));
+        } else {
+          // default value is system level option
+          compactionSize = CarbonProperties.getInstance().getMajorCompactionSize();
+        }
+        break;
+      case MINOR:
+        // if table level option is identified, use it to overwrite system level option
+        if (tblProps.containsKey(CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE)) {
+          compactionSize = Long.parseLong(
+                  tblProps.get(CarbonCommonConstants.TABLE_MINOR_COMPACTION_SIZE));
+        } else {
+          // default value is system level option
+          compactionSize = CarbonProperties.getInstance().getMinorCompactionSize();
         }
         break;
       default: // this case can not come.