You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ra...@apache.org on 2017/11/17 16:09:52 UTC

carbondata git commit: [CARBONDATA-1706] Making index merge DDL insensitive to the property

Repository: carbondata
Updated Branches:
  refs/heads/master 1d2af6293 -> f22e61460


[CARBONDATA-1706] Making index merge DDL insensitive to the property

(1) Segment index merge DDL will be insensitive to the property. Even if the property is set to false, DDL should be able to merge index files
(2) Compaction name is changed from SEGMENT_INDEX_COMPACTION to SEGMENT_INDEX

This closes #1494


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/f22e6146
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/f22e6146
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/f22e6146

Branch: refs/heads/master
Commit: f22e61460ab6c006617b0ec3c332de7f49c74883
Parents: 1d2af62
Author: dhatchayani <dh...@gmail.com>
Authored: Tue Nov 14 15:17:14 2017 +0530
Committer: ravipesala <ra...@gmail.com>
Committed: Fri Nov 17 21:39:22 2017 +0530

----------------------------------------------------------------------
 .../sdv/generated/MergeIndexTestCase.scala      |  2 +-
 .../CarbonIndexFileMergeTestCase.scala          | 25 ++++++++++++++++++-
 .../apache/carbondata/spark/rdd/Compactor.scala |  3 ++-
 .../carbondata/spark/util/CommonUtil.scala      | 26 +++++++++++++++++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../AlterTableCompactionCommand.scala           |  5 ++--
 .../sql/execution/strategy/DDLStrategy.scala    |  2 +-
 7 files changed, 53 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
index 2e26d7f..758c897 100644
--- a/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
+++ b/integration/spark-common-cluster-test/src/test/scala/org/apache/carbondata/cluster/sdv/generated/MergeIndexTestCase.scala
@@ -79,7 +79,7 @@ class MergeIndexTestCase extends QueryTest with BeforeAndAfterAll {
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 2)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
-    sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX_COMPACTION'").collect()
+    sql("ALTER TABLE carbon_automation_nonmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "0") == 0)
     assert(getIndexFileCount("default", "carbon_automation_nonmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from carbon_automation_nonmerge"""), rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
index 110557c..f06994c 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/CarbonIndexFileMergeTestCase.scala
@@ -92,7 +92,30 @@ class CarbonIndexFileMergeTestCase
     assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
     CarbonProperties.getInstance()
       .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "true")
-    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX_COMPACTION'").collect()
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
+    checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)
+  }
+
+  test("Verify command of index merge without enabling property") {
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql("DROP TABLE IF EXISTS nonindexmerge")
+    sql(
+      """
+        | CREATE TABLE nonindexmerge(id INT, name STRING, city STRING, age INT)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('SORT_COLUMNS'='city,name', 'SORT_SCOPE'='GLOBAL_SORT')
+      """.stripMargin)
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    sql(s"LOAD DATA LOCAL INPATH '$file2' INTO TABLE nonindexmerge OPTIONS('header'='false', " +
+        s"'GLOBAL_SORT_PARTITIONS'='100')")
+    val rows = sql("""Select count(*) from nonindexmerge""").collect()
+    assert(getIndexFileCount("default_nonindexmerge", "0") == 100)
+    assert(getIndexFileCount("default_nonindexmerge", "1") == 100)
+    sql("ALTER TABLE nonindexmerge COMPACT 'SEGMENT_INDEX'").collect()
     assert(getIndexFileCount("default_nonindexmerge", "0") == 0)
     assert(getIndexFileCount("default_nonindexmerge", "1") == 0)
     checkAnswer(sql("""Select count(*) from nonindexmerge"""), rows)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index a787af2..7057816 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -113,7 +113,8 @@ object Compactor {
 
     if (finalMergeStatus) {
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
-      CommonUtil.mergeIndexFiles(sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable)
+      CommonUtil.mergeIndexFiles(
+        sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable, false)
 
       // trigger event for compaction
       val alterTableCompactionPostEvent: AlterTableCompactionPostEvent =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 6c0e802..f0b33f4 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -845,13 +845,31 @@ object CommonUtil {
   def mergeIndexFiles(sparkContext: SparkContext,
       segmentIds: Seq[String],
       tablePath: String,
-      carbonTable: CarbonTable): Unit = {
-    if (CarbonProperties.getInstance().getProperty(
-      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
-      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+      carbonTable: CarbonTable,
+      mergeIndexProperty: Boolean): Unit = {
+    if (mergeIndexProperty) {
       new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
         carbonTable.getDatabaseName, carbonTable.getTableName).getTablePath,
         segmentIds).collect()
+    } else {
+      try {
+        CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT).toBoolean
+        if (CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+          new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
+            carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+            segmentIds).collect()
+        }
+      } catch {
+        case _: Exception =>
+          if (CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT.toBoolean) {
+            new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(tablePath,
+              carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+              segmentIds).collect()
+          }
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index c12d2ef..e32c407 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -330,7 +330,7 @@ object CarbonDataRDDFactory {
           loadDataFile(sqlContext, carbonLoadModel)
         }
         CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
-          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable)
+          Seq(carbonLoadModel.getSegmentId), storePath, carbonTable, false)
         val newStatusMap = scala.collection.mutable.Map.empty[String, SegmentStatus]
         if (status.nonEmpty) {
           status.foreach { eachLoadStatus =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
index 2f04feb..51275aa 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/AlterTableCompactionCommand.scala
@@ -113,8 +113,7 @@ case class AlterTableCompactionCommand(
         carbonLoadModel.setLoadMetadataDetails(
           alterTableModel.segmentUpdateStatusManager.get.getLoadMetadataDetails.toList.asJava)
       }
-    } else if (alterTableModel.compactionType.equalsIgnoreCase(
-      CompactionType.SEGMENT_INDEX_COMPACTION.toString)) {
+    } else if (alterTableModel.compactionType.equalsIgnoreCase("segment_index")) {
       compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
     } else {
       compactionType = CompactionType.MINOR_COMPACTION
@@ -132,7 +131,7 @@ case class AlterTableCompactionCommand(
       CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
         carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
         carbonLoadModel.getTablePath,
-        carbonTable)
+        carbonTable, true)
       return
     }
     // reading the start time of data load.

http://git-wip-us.apache.org/repos/asf/carbondata/blob/f22e6146/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index d6450c1..4e75547 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -85,7 +85,7 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
         if (isCarbonTable) {
           if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
               altertablemodel.compactionType.equalsIgnoreCase("major") ||
-              altertablemodel.compactionType.equalsIgnoreCase("SEGMENT_INDEX_COMPACTION")) {
+              altertablemodel.compactionType.equalsIgnoreCase("segment_index")) {
             ExecutedCommandExec(alterTable) :: Nil
           } else {
             throw new MalformedCarbonCommandException(