You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/11 14:55:14 UTC

carbondata git commit: [CARBONDATA-1481] Add test cases for compaction of global sorted segment

Repository: carbondata
Updated Branches:
  refs/heads/master 133b30391 -> ac6c1d2b8


[CARBONDATA-1481] Add test cases for compaction of global sorted segment

Only test cases are added for compaction of global sorted segment in this PR

This closes #1361


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac6c1d2b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac6c1d2b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac6c1d2b

Branch: refs/heads/master
Commit: ac6c1d2b8ca9e68b98d6845f710b70d60a5f48c5
Parents: 133b303
Author: xubo245 <60...@qq.com>
Authored: Wed Sep 13 20:15:39 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 11 22:55:00 2017 +0800

----------------------------------------------------------------------
 .../src/test/resources/globalsort/sample1.csv   |   8 +-
 .../src/test/resources/globalsort/sample2.csv   |   8 +-
 .../src/test/resources/globalsort/sample3.csv   |   8 +-
 ...CompactionSupportGlobalSortBigFileTest.scala | 136 +++++
 ...ompactionSupportGlobalSortFunctionTest.scala | 535 +++++++++++++++++++
 ...mpactionSupportGlobalSortParameterTest.scala | 534 ++++++++++++++++++
 .../testsuite/sortcolumns/TestSortColumns.scala |   1 +
 7 files changed, 1218 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
index 9cb11be..2fc7bc4 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
@@ -1,5 +1,5 @@
 id,name,city,age
-1,a,wuhan,10
-2,b,hangzhou,20
-3,c,beijing,30
-4,d,shenzhen,40
+10,a,wuhan,10
+4,y,hangzhou,20
+7,z,beijing,30
+1,d,shenzhen,40
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
index 300c254..75e7d93 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
@@ -1,5 +1,5 @@
 id,name,city,age
-5,e,wuhan,50
-6,f,hangzhou,60
-7,g,beijing,70
-eight,h,shenzhen,80
+11,c,wuhan,50
+2,f,hangzhou,60
+5,m,beijing,70
+eight,b,shenzhen,80
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
index 8e51dae..5eb6b02 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
@@ -1,5 +1,5 @@
 id,name,city,age
-9,i,wuhan,90
-10,j,hangzhou,100
-11,k,beijing,110
-12,l,shenzhen,120
+9,e,wuhan,90
+6,x,hangzhou,100
+3,k,beijing,110
+12,l,shenzhen,120
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
new file mode 100644
index 0000000..6d79f6c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val file1 = resourcesPath + "/compaction/fil1.csv"
+  val file2 = resourcesPath + "/compaction/fil2.csv"
+  val file3 = resourcesPath + "/compaction/fil3.csv"
+  val file4 = resourcesPath + "/compaction/fil4.csv"
+  val file5 = resourcesPath + "/compaction/fil5.csv"
+
+  override protected def beforeAll(): Unit = {
+    resetConf("10")
+    //n should be about 5000000 of reset if size is default 1024
+    val n = 150000
+    CompactionSupportGlobalSortBigFileTest.createFile(file1, n, 0)
+    CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+    CompactionSupportGlobalSortBigFileTest.createFile(file3, n * 3, n * 5)
+    CompactionSupportGlobalSortBigFileTest.createFile(file4, n * 2, n * 8)
+    CompactionSupportGlobalSortBigFileTest.createFile(file5, n * 2, n * 13)
+  }
+
+  override protected def afterAll(): Unit = {
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file1)
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file3)
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file4)
+    CompactionSupportGlobalSortBigFileTest.deleteFile(file5)
+    resetConf(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE)
+  }
+
+  override def beforeEach {
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql(
+      """
+        | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+  }
+
+  override def afterEach {
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+  }
+
+  test("Compaction major:  segments size is bigger than default compaction size") {
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+    sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    checkAnswer(sql("select count(*) from compaction_globalsort"),sql("select count(*) from carbon_localsort"))
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+  }
+
+  private def resetConf(size:String) {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, size)
+  }
+}
+
+object CompactionSupportGlobalSortBigFileTest {
+  def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = {
+    try {
+      val write = new PrintWriter(fileName);
+      for (i <- start until (start + line)) {
+        write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+      }
+      write.close()
+    } catch {
+      case _: Exception => false
+    }
+    true
+  }
+
+  def deleteFile(fileName: String): Boolean = {
+    try {
+      val file = new File(fileName)
+      if (file.exists()) {
+        file.delete()
+      }
+    } catch {
+      case _: Exception => false
+    }
+    true
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
new file mode 100644
index 0000000..6f8648d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val filePath: String = s"$resourcesPath/globalsort"
+  val file1: String = resourcesPath + "/globalsort/sample1.csv"
+  val file2: String = resourcesPath + "/globalsort/sample2.csv"
+  val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+  override def beforeEach {
+    resetConf
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql(
+      """
+        | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+  }
+
+  override def afterEach {
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+  }
+
+  test("Compaction type: major") {
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 4)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 3)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+      sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+  }
+
+  test("Compaction type: minor, < default segments in level 1, not compact") {
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 3)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+  }
+
+  test("Compaction type: minor, >= default segments and < (default segments)*2 in level 1, compact once") {
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 4)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+  }
+
+  test("Compaction type: minor, >= default segments in level 1,compact twice in level 1") {
+    for (i <- 0 until 3) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+    }
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.contains("4.1"))
+    assert(!SegmentSequenceIds.contains("0.2"))
+    assert(SegmentSequenceIds.length == 11)
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(36)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+  }
+
+  test("Compaction type: minor, >= compacted segments in level 2,compact once in level 2") {
+    for (i <- 0 until 4) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+    }
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.contains("8.1"))
+    assert(SegmentSequenceIds.contains("0.2"))
+    assert(SegmentSequenceIds.length == 16)
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 15)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(48)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort limit 12"),
+      sql("SELECT * FROM carbon_localsort order by city,name limit 12"))
+  }
+
+  test("Compaction: clean files, major") {
+    for (i <- 0 until 1) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+    sql("clean files for table compaction_globalsort")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 1)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+      sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+  }
+
+  test("Compaction: clean files, minor") {
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+    sql("clean files for table compaction_globalsort")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 3)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+  }
+
+  test("Compaction: global_sort_partitions=1, major") {
+    for (i <- 0 until 1) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+    sql("clean files for table compaction_globalsort")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 1)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+      sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+  }
+
+  test("Compaction: global_sort_partitions=2, major") {
+    for (i <- 0 until 1) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+    sql("clean files for table compaction_globalsort")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 1)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+  }
+
+  test("Compaction: delete, major") {
+    for (i <- 0 until 1) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+    sql("clean files for table compaction_globalsort")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 1)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    sql("delete from table compaction_globalsort where SEGMENT.ID in (0.1)")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+  }
+
+  test("Compaction: delete, minor") {
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+    sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+    sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 6)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+  }
+
+  test("Compaction: load from file dictory, three csv file, major") {
+    for (i <- 0 until 6) {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
+    }
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+  }
+
+  test("Compaction: load from file dictory, three csv file, minor") {
+    for (i <- 0 until 6) {
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
+    }
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+  }
+
+  test("Compaction: one file and no sort_columns") {
+    sql("DROP TABLE IF EXISTS compaction_globalsort2")
+    sql(
+      """
+        | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        |  TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2")
+
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+    sql("clean files for table compaction_globalsort")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 1)
+    assert(status.filter(_.equals("Compacted")).length == 0)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12)))
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM compaction_globalsort2"))
+    sql("DROP TABLE IF EXISTS compaction_globalsort2")
+  }
+
+  test("Compaction: global_sort sort_columns is int data type") {
+    sql("DROP TABLE IF EXISTS compaction_globalsort2")
+    sql(
+      """
+        | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        |  TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
+    sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2")
+
+    sql("ALTER TABLE compaction_globalsort2 COMPACT 'MAJOR'")
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort2")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 4)
+    assert(status.filter(_.equals("Compacted")).length == 3)
+
+    assert(getIndexFileCount("compaction_globalsort2", "0.1") === 1)
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12)))
+    sql("DROP TABLE IF EXISTS compaction_globalsort2")
+  }
+
+  private def resetConf() {
+    val prop = CarbonProperties.getInstance()
+    prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
+    prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+  private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
+    val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+    val list = new File(store).list(new FilenameFilter {
+      override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+    })
+    list.size
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
new file mode 100644
index 0000000..1511b51
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+  val filePath: String = s"$resourcesPath/globalsort"
+  val file1: String = resourcesPath + "/globalsort/sample1.csv"
+  val file2: String = resourcesPath + "/globalsort/sample2.csv"
+  val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+  override def beforeEach {
+    resetConf
+    
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql(
+      """
+        | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+    sql(
+      """
+        | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+  }
+
+  override def afterEach {
+    sql("DROP TABLE IF EXISTS compaction_globalsort")
+    sql("DROP TABLE IF EXISTS carbon_localsort")
+  }
+
+  test("MINOR, ENABLE_AUTO_LOAD_MERGE: false") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+    sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+    sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 6)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("MINOR, ENABLE_AUTO_LOAD_MERGE: true") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+
+    // loaded 6 times and produced 6 segments,
+    // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
+    // so total segment number is 7
+    assert(SegmentSequenceIds.length == 7)
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      "0")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 4)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+  }
+
+  test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      "4")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(!SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 6)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 0)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+  }
+
+  test("MINOR, DAYS_ALLOWED_TO_COMPACT: 0") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      "0")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 4)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+  }
+
+  test("MINOR, DAYS_ALLOWED_TO_COMPACT: 4") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      "4")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 4)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+  }
+
+  test("MAJOR, ENABLE_AUTO_LOAD_MERGE: false") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+    sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("MAJOR, ENABLE_AUTO_LOAD_MERGE: true") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+
+    // loaded 6 times and produced 6 segments,
+    // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
+    // major compact and prodece 1 segment
+    // so total segment number is 8
+    assert(SegmentSequenceIds.length == 8)
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+      CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+  }
+
+  test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      "0")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 6)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+  }
+
+  test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      "4")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 2)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+      CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+  }
+
+  test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 0") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      "0")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 6)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+  }
+
+  test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 4") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      "4")
+    for (i <- 0 until 2) {
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+      sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+      sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+    }
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+    checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+    sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+    checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+    val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+    val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(SegmentSequenceIds.contains("0.1"))
+    assert(!SegmentSequenceIds.contains("4.1"))
+    assert(SegmentSequenceIds.length == 7)
+
+    val status = segments.collect().map { each => (each.toSeq) (1) }
+    assert(status.filter(_.equals("Compacted")).length == 6)
+
+    assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+    checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+    checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+      sql("SELECT * FROM carbon_localsort"))
+
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+      CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+  }
+  private def resetConf() {
+    val prop = CarbonProperties.getInstance()
+    prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+    prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
+    prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+  }
+
+  private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
+    val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+    val list = new File(store).list(new FilenameFilter {
+      override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+    })
+    list.size
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index 6347241..b655025 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -91,6 +91,7 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
     "create table with no dictionary sort_columns where NumberOfNoDictSortColumns is less than " +
     "NoDictionaryCount")
   {
+    sql("drop table if exists sorttable1b")
     sql(
       "CREATE TABLE sorttable1b (empno String, empname String, designation String, doj Timestamp," +
       " workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +