You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/20 11:21:39 UTC
carbondata git commit: [CARBONDATA-1771] While segment_index
compaction, .carbonindex files of invalid segments are also getting merged
Repository: carbondata
Updated Branches:
refs/heads/master 400395a8d -> cac619555
[CARBONDATA-1771] While segment_index compaction, .carbonindex files of invalid segments are also getting merged
Scenario:
Disable feature, do loads, execute MINOR compaction
Execute SEGMENT_INDEX compaction
SEGMENT_INDEX compaction merges .carbonindex files of compacted invalid segments also
Solution:
Merge index files of valid segments only
This closes #1535
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cac61955
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cac61955
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cac61955
Branch: refs/heads/master
Commit: cac6195553ec68b111c886fb27dc7ac3808d4502
Parents: 400395a
Author: dhatchayani <dh...@gmail.com>
Authored: Sun Nov 19 20:53:50 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Nov 20 16:51:25 2017 +0530
----------------------------------------------------------------------
.../CarbonIndexFileMergeTestCase.scala | 33 ++++++++++++++++++++
.../AlterTableCompactionCommand.scala | 2 +-
2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index f06994c..c66107f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -148,6 +148,39 @@ class CarbonIndexFileMergeTestCase
checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
}
+ test("Verify index index merge for compacted segments") {
+ CarbonProperties.getInstance()
+ .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+ sql("DROP TABLE IF EXISTS nonindexmerge")
+ sql(
+ """
+ | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+ | STORED BY 'org.apache.carbondata.format'
+ | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+ """.stripMargin)
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+ s"'GLOBAL_SORT_PARTITIONS'='100')")
+ val rows = sql("""Select count(*) from nonindexmerge""").collect()
+ assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+ sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+ sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
+ assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+ assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+ checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+ }
+
private def getIndexFileCount(tableName: String, segment: String): Int = {
val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
val path = CarbonTablePath
http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 826d35a..6e11fe4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -142,7 +142,7 @@ case class AlterTableCompactionCommand(
if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
// Just launch job to merge index and return
CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
- carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
+ CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
carbonLoadModel.getTablePath,
carbonTable, true)
return