You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/20 11:21:39 UTC

carbondata git commit: [CARBONDATA-1771] While segment_index compaction, .carbonindex files of invalid segments are also getting merged

Repository: carbondata
Updated Branches:
  refs/heads/master 400395a8d -> cac619555


[CARBONDATA-1771] While segment_index compaction, .carbonindex files of invalid segments are also getting merged

Scenario:
Disable feature, do loads, execute MINOR compaction
Execute SEGMENT_INDEX compaction
SEGMENT_INDEX compaction merges .carbonindex files of compacted invalid segments also

Solution:
Merge index files of valid segments only

This closes #1535


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/cac61955
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/cac61955
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/cac61955

Branch: refs/heads/master
Commit: cac6195553ec68b111c886fb27dc7ac3808d4502
Parents: 400395a
Author: dhatchayani <dh...@gmail.com>
Authored: Sun Nov 19 20:53:50 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Mon Nov 20 16:51:25 2017 +0530

----------------------------------------------------------------------
 .../CarbonIndexFileMergeTestCase.scala          | 33 ++++++++++++++++++++
 .../AlterTableCompactionCommand.scala           |  2 +-
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index f06994c..c66107f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -148,6 +148,39 @@ class CarbonIndexFileMergeTestCase
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
   }
 
+  test("Verify index index merge for compacted segments") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    sql("ALTER TABLE nonindexmerge COMPACT 'minor'").collect()
+    sql("ALTER TABLE nonindexmerge COMPACT 'segment_index'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "2") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "3") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "0.1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+  }
+
   private def getIndexFileCount(tableName: String, segment: String): Int = {
     val table = CarbonMetadata.getInstance().getCarbonTable(tableName)
     val path = CarbonTablePath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/cac61955/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 826d35a..6e11fe4 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -142,7 +142,7 @@ case class AlterTableCompactionCommand(
     if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
       // Just launch job to merge index and return
       CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-        carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
+        CarbonDataMergerUtil.getValidSegmentList(carbonTable.getAbsoluteTableIdentifier).asScala,
         carbonLoadModel.getTablePath,
         carbonTable, true)
       return