You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ja...@apache.org on 2017/10/11 14:55:14 UTC
carbondata git commit: [CARBONDATA-1481] Add test cases for
compaction of global sorted segment
Repository: carbondata
Updated Branches:
refs/heads/master 133b30391 -> ac6c1d2b8
[CARBONDATA-1481] Add test cases for compaction of global sorted segment
Only test cases are added for compaction of global sorted segment in this PR
This closes #1361
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/ac6c1d2b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/ac6c1d2b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/ac6c1d2b
Branch: refs/heads/master
Commit: ac6c1d2b8ca9e68b98d6845f710b70d60a5f48c5
Parents: 133b303
Author: xubo245 <60...@qq.com>
Authored: Wed Sep 13 20:15:39 2017 +0800
Committer: Jacky Li <ja...@qq.com>
Committed: Wed Oct 11 22:55:00 2017 +0800
----------------------------------------------------------------------
.../src/test/resources/globalsort/sample1.csv | 8 +-
.../src/test/resources/globalsort/sample2.csv | 8 +-
.../src/test/resources/globalsort/sample3.csv | 8 +-
...CompactionSupportGlobalSortBigFileTest.scala | 136 +++++
...ompactionSupportGlobalSortFunctionTest.scala | 535 +++++++++++++++++++
...mpactionSupportGlobalSortParameterTest.scala | 534 ++++++++++++++++++
.../testsuite/sortcolumns/TestSortColumns.scala | 1 +
7 files changed, 1218 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
index 9cb11be..2fc7bc4 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample1.csv
@@ -1,5 +1,5 @@
id,name,city,age
-1,a,wuhan,10
-2,b,hangzhou,20
-3,c,beijing,30
-4,d,shenzhen,40
+10,a,wuhan,10
+4,y,hangzhou,20
+7,z,beijing,30
+1,d,shenzhen,40
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
index 300c254..75e7d93 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample2.csv
@@ -1,5 +1,5 @@
id,name,city,age
-5,e,wuhan,50
-6,f,hangzhou,60
-7,g,beijing,70
-eight,h,shenzhen,80
+11,c,wuhan,50
+2,f,hangzhou,60
+5,m,beijing,70
+eight,b,shenzhen,80
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
index 8e51dae..5eb6b02 100644
--- a/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
+++ b/integration/spark-common-test/src/test/resources/globalsort/sample3.csv
@@ -1,5 +1,5 @@
id,name,city,age
-9,i,wuhan,90
-10,j,hangzhou,100
-11,k,beijing,110
-12,l,shenzhen,120
+9,e,wuhan,90
+6,x,hangzhou,100
+3,k,beijing,110
+12,l,shenzhen,120
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
new file mode 100644
index 0000000..6d79f6c
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortBigFileTest.scala
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, PrintWriter}
+
+import scala.util.Random
+
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CompactionSupportGlobalSortBigFileTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+ val file1 = resourcesPath + "/compaction/fil1.csv"
+ val file2 = resourcesPath + "/compaction/fil2.csv"
+ val file3 = resourcesPath + "/compaction/fil3.csv"
+ val file4 = resourcesPath + "/compaction/fil4.csv"
+ val file5 = resourcesPath + "/compaction/fil5.csv"
+
+ override protected def beforeAll(): Unit = {
+ resetConf("10")
+ //n should be about 5000000 of reset if size is default 1024
+ val n = 150000
+ CompactionSupportGlobalSortBigFileTest.createFile(file1, n, 0)
+ CompactionSupportGlobalSortBigFileTest.createFile(file2, n * 4, n)
+ CompactionSupportGlobalSortBigFileTest.createFile(file3, n * 3, n * 5)
+ CompactionSupportGlobalSortBigFileTest.createFile(file4, n * 2, n * 8)
+ CompactionSupportGlobalSortBigFileTest.createFile(file5, n * 2, n * 13)
+ }
+
+ override protected def afterAll(): Unit = {
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file1)
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file2)
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file3)
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file4)
+ CompactionSupportGlobalSortBigFileTest.deleteFile(file5)
+ resetConf(CarbonCommonConstants.DEFAULT_MAJOR_COMPACTION_SIZE)
+ }
+
+ override def beforeEach {
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql(
+ """
+ | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ sql(
+ """
+ | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ }
+
+ override def afterEach {
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ }
+
+ test("Compaction major: segments size is bigger than default compaction size") {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE carbon_localsort OPTIONS('header'='false')")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file4' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+ sql(s"LOAD DATA LOCAL INPATH '$file5' INTO TABLE compaction_globalsort OPTIONS('header'='false')")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ checkAnswer(sql("select count(*) from compaction_globalsort"),sql("select count(*) from carbon_localsort"))
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ }
+
+ private def resetConf(size:String) {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.MAJOR_COMPACTION_SIZE, size)
+ }
+}
+
+object CompactionSupportGlobalSortBigFileTest {
+ def createFile(fileName: String, line: Int = 10000, start: Int = 0): Boolean = {
+ try {
+ val write = new PrintWriter(fileName);
+ for (i <- start until (start + line)) {
+ write.println(i + "," + "n" + i + "," + "c" + Random.nextInt(line) + "," + Random.nextInt(80))
+ }
+ write.close()
+ } catch {
+ case _: Exception => false
+ }
+ true
+ }
+
+ def deleteFile(fileName: String): Boolean = {
+ try {
+ val file = new File(fileName)
+ if (file.exists()) {
+ file.delete()
+ }
+ } catch {
+ case _: Exception => false
+ }
+ true
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
new file mode 100644
index 0000000..6f8648d
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortFunctionTest.scala
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+
+class CompactionSupportGlobalSortFunctionTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+ val filePath: String = s"$resourcesPath/globalsort"
+ val file1: String = resourcesPath + "/globalsort/sample1.csv"
+ val file2: String = resourcesPath + "/globalsort/sample2.csv"
+ val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+ override def beforeEach {
+ resetConf
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql(
+ """
+ | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ sql(
+ """
+ | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ }
+
+ override def afterEach {
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ }
+
+ test("Compaction type: major") {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 4)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 3)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+ sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+ }
+
+ test("Compaction type: minor, < default segments in level 1, not compact") {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(!SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 3)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ }
+
+ test("Compaction type: minor, >= default segments and < (default segments)*2 in level 1, compact once") {
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 4)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ }
+
+ test("Compaction type: minor, >= default segments in level 1,compact twice in level 1") {
+ for (i <- 0 until 3) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ }
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.contains("4.1"))
+ assert(!SegmentSequenceIds.contains("0.2"))
+ assert(SegmentSequenceIds.length == 11)
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(36)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ }
+
+ test("Compaction type: minor, >= compacted segments in level 2,compact once in level 2") {
+ for (i <- 0 until 4) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ }
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.contains("8.1"))
+ assert(SegmentSequenceIds.contains("0.2"))
+ assert(SegmentSequenceIds.length == 16)
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 15)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(48)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort limit 12"),
+ sql("SELECT * FROM carbon_localsort order by city,name limit 12"))
+ }
+
+ test("Compaction: clean files, major") {
+ for (i <- 0 until 1) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+ sql("clean files for table compaction_globalsort")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 1)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+ sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+ }
+
+ test("Compaction: clean files, minor") {
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+ sql("clean files for table compaction_globalsort")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 3)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ }
+
+ test("Compaction: global_sort_partitions=1, major") {
+ for (i <- 0 until 1) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='1')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+ sql("clean files for table compaction_globalsort")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 1)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort limit 3"),
+ sql("SELECT * FROM carbon_localsort order by city,name limit 3"))
+ }
+
+ test("Compaction: global_sort_partitions=2, major") {
+ for (i <- 0 until 1) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+ sql("clean files for table compaction_globalsort")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 1)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ }
+
+ test("Compaction: delete, major") {
+ for (i <- 0 until 1) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'major'")
+ sql("clean files for table compaction_globalsort")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 1)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 2)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ sql("delete from table compaction_globalsort where SEGMENT.ID in (0.1)")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+ }
+
+ test("Compaction: delete, minor") {
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+ sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+ sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(!SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 6)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+ }
+
+ test("Compaction: load from file dictory, three csv file, major") {
+ for (i <- 0 until 6) {
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
+ }
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+ }
+
+ test("Compaction: load from file dictory, three csv file, minor") {
+ for (i <- 0 until 6) {
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$filePath' INTO TABLE compaction_globalsort")
+ }
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 3)
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(72)))
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+ }
+
+ test("Compaction: one file and no sort_columns") {
+ sql("DROP TABLE IF EXISTS compaction_globalsort2")
+ sql(
+ """
+ | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+ sql("clean files for table compaction_globalsort")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 1)
+ assert(status.filter(_.equals("Compacted")).length == 0)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12)))
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM compaction_globalsort2"))
+ sql("DROP TABLE IF EXISTS compaction_globalsort2")
+ }
+
+ test("Compaction: global_sort sort_columns is int data type") {
+ sql("DROP TABLE IF EXISTS compaction_globalsort2")
+ sql(
+ """
+ | CREATE TABLE compaction_globalsort2(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='id','SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort2")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort2")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort2")
+
+ sql("ALTER TABLE compaction_globalsort2 COMPACT 'MAJOR'")
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort2")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 4)
+ assert(status.filter(_.equals("Compacted")).length == 3)
+
+ assert(getIndexFileCount("compaction_globalsort2", "0.1") === 1)
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort2"), Seq(Row(12)))
+ sql("DROP TABLE IF EXISTS compaction_globalsort2")
+ }
+
+ private def resetConf() {
+ val prop = CarbonProperties.getInstance()
+ prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
+ prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+ }
+
+ private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
+ val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+ val list = new File(store).list(new FilenameFilter {
+ override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+ })
+ list.size
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
new file mode 100644
index 0000000..1511b51
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CompactionSupportGlobalSortParameterTest.scala
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the"License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an"AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.datacompaction
+
+import java.io.{File, FilenameFilter}
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach}
+
+class CompactionSupportGlobalSortParameterTest extends QueryTest with BeforeAndAfterEach with BeforeAndAfterAll {
+ val filePath: String = s"$resourcesPath/globalsort"
+ val file1: String = resourcesPath + "/globalsort/sample1.csv"
+ val file2: String = resourcesPath + "/globalsort/sample2.csv"
+ val file3: String = resourcesPath + "/globalsort/sample3.csv"
+
+ override def beforeEach {
+ resetConf
+
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql(
+ """
+ | CREATE TABLE compaction_globalsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ sql(
+ """
+ | CREATE TABLE carbon_localsort(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ """.stripMargin)
+ }
+
+ override def afterEach {
+ sql("DROP TABLE IF EXISTS compaction_globalsort")
+ sql("DROP TABLE IF EXISTS carbon_localsort")
+ }
+
+ test("MINOR, ENABLE_AUTO_LOAD_MERGE: false") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+ sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+ sql("ALTER TABLE compaction_globalsort COMPACT 'minor'")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(!SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 6)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+ CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+ }
+
+ test("MINOR, ENABLE_AUTO_LOAD_MERGE: true") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+
+ // loaded 6 times and produced 6 segments,
+ // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
+ // so total segment number is 7
+ assert(SegmentSequenceIds.length == 7)
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+ CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+ }
+
+ test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ "0")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 4)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+ }
+
+ test("MINOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ "4")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), false, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(!SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 6)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 0)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+ }
+
+ test("MINOR, DAYS_ALLOWED_TO_COMPACT: 0") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ "0")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 4)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+ }
+
+ test("MINOR, DAYS_ALLOWED_TO_COMPACT: 4") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ "4")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MINOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 4)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+ }
+
+ test("MAJOR, ENABLE_AUTO_LOAD_MERGE: false") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("delete from table compaction_globalsort where SEGMENT.ID in (1,2,3)")
+ sql("delete from table carbon_localsort where SEGMENT.ID in (1,2,3)")
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(12)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Success")
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Marked for Delete")
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+ CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+ }
+
+ test("MAJOR, ENABLE_AUTO_LOAD_MERGE: true") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort OPTIONS('GLOBAL_SORT_PARTITIONS'='2')")
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+
+ // loaded 6 times and produced 6 segments,
+ // auto merge will compact and produce 1 segment because 6 is bigger than 4 (default value of minor),
+ // major compact and prodece 1 segment
+ // so total segment number is 8
+ assert(SegmentSequenceIds.length == 8)
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE,
+ CarbonCommonConstants.DEFAULT_ENABLE_AUTO_LOAD_MERGE)
+ }
+
+ test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 0") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ "0")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 6)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+ }
+
+ test("MAJOR, PRESERVE_LATEST_SEGMENTS_NUMBER: 4") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ "4")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 2)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER,
+ CarbonCommonConstants.DEFAULT_PRESERVE_LATEST_SEGMENTS_NUMBER)
+ }
+
+ test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 0") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ "0")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 6)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+ }
+
+ test("MAJOR, DAYS_ALLOWED_TO_COMPACT: 4") {
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ "4")
+ for (i <- 0 until 2) {
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE carbon_localsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE carbon_localsort")
+
+ sql(s"LOAD DATA LOCAL INPATH '$file1' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE compaction_globalsort")
+ sql(s"LOAD DATA LOCAL INPATH '$file3' INTO TABLE compaction_globalsort")
+
+ }
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "global_sort")
+
+ checkExistence(sql("DESCRIBE FORMATTED compaction_globalsort"), true, "city,name")
+
+ sql("ALTER TABLE compaction_globalsort COMPACT 'MAJOR'")
+
+ checkExistence(sql("SHOW SEGMENTS FOR TABLE compaction_globalsort"), true, "Compacted")
+
+ val segments = sql("SHOW SEGMENTS FOR TABLE compaction_globalsort")
+ val SegmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+ assert(SegmentSequenceIds.contains("0.1"))
+ assert(!SegmentSequenceIds.contains("4.1"))
+ assert(SegmentSequenceIds.length == 7)
+
+ val status = segments.collect().map { each => (each.toSeq) (1) }
+ assert(status.filter(_.equals("Compacted")).length == 6)
+
+ assert(getIndexFileCount("compaction_globalsort", "0.1") === 1)
+
+ checkAnswer(sql("SELECT COUNT(*) FROM compaction_globalsort"), Seq(Row(24)))
+
+ checkAnswer(sql("SELECT * FROM compaction_globalsort"),
+ sql("SELECT * FROM carbon_localsort"))
+
+ CarbonProperties.getInstance().addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT,
+ CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT)
+ }
+ private def resetConf() {
+ val prop = CarbonProperties.getInstance()
+ prop.addProperty(CarbonCommonConstants.LOAD_SORT_SCOPE, CarbonCommonConstants.LOAD_SORT_SCOPE_DEFAULT)
+ prop.addProperty(CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS, CarbonCommonConstants.LOAD_GLOBAL_SORT_PARTITIONS_DEFAULT)
+ prop.addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, CarbonCommonConstants.DEFAULT_SEGMENT_LEVEL_THRESHOLD)
+ }
+
+ private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = {
+ val store = storeLocation + "/default/" + tableName + "/Fact/Part0/Segment_" + segmentNo
+ val list = new File(store).list(new FilenameFilter {
+ override def accept(dir: File, name: String) = name.endsWith(".carbonindex")
+ })
+ list.size
+ }
+}
http://git-wip-us.apache.org/repos/asf/carbondata/blob/ac6c1d2b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
index 6347241..b655025 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/sortcolumns/TestSortColumns.scala
@@ -91,6 +91,7 @@ class TestSortColumns extends QueryTest with BeforeAndAfterAll {
"create table with no dictionary sort_columns where NumberOfNoDictSortColumns is less than " +
"NoDictionaryCount")
{
+ sql("drop table if exists sorttable1b")
sql(
"CREATE TABLE sorttable1b (empno String, empname String, designation String, doj Timestamp," +
" workgroupcategory int, workgroupcategoryname String, deptno int, deptname String, " +