You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2019/06/17 15:57:41 UTC

[carbondata] branch master updated: [CARBONDATA-3434] Fix Data Mismatch between MainTable and MV DataMap table during compaction

This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dfad55  [CARBONDATA-3434] Fix Data Mismatch between MainTable and MV DataMap table during compaction
3dfad55 is described below

commit 3dfad55bf9e067950008aed8f67e6bfb4cdc6584
Author: Indhumathi27 <in...@gmail.com>
AuthorDate: Wed Jun 12 16:37:31 2019 +0530

    [CARBONDATA-3434] Fix Data Mismatch between MainTable and MV DataMap table during compaction
    
    Fix Data Mismatch between MainTable and DataMap table during compaction
    
    Problem:
    checkIfSegmentsToBeReloaded method of DataMapProvider was ignoring one main table segment to be loaded, considering it as already loaded
    Solution:
    Get all segments merged to given segment and check if it contains all list of segments stored in SegmentMapInfo. If true, then no need to load the segment, only update segment mapping
    
    Block delete operation on datamap table
    
    This closes #3282
---
 .../carbondata/core/datamap/DataMapProvider.java   | 18 +++++++--------
 .../mv/rewrite/MVIncrementalLoadingTestcase.scala  | 27 ++++++++++++++++++++++
 .../mv/rewrite/TestAllOperationsOnMV.scala         | 11 +++++++++
 .../spark/sql/hive/CarbonAnalysisRules.scala       |  9 ++------
 4 files changed, 48 insertions(+), 17 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
index 6a9d2c5..d0b66f3 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapProvider.java
@@ -370,18 +370,16 @@ public abstract class DataMapProvider {
   private boolean checkIfSegmentsToBeReloaded(LoadMetadataDetails[] loadMetaDataDetails,
       List<String> segmentIds, String segmentId) {
     boolean isToBeLoadedAgain = true;
-    for (String loadName : segmentIds) {
-      for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
-        if (loadMetadataDetail.getLoadName().equalsIgnoreCase(loadName)) {
-          if (null != loadMetadataDetail.getMergedLoadName() && loadMetadataDetail
-              .getMergedLoadName().equalsIgnoreCase(segmentId)) {
-            isToBeLoadedAgain = false;
-          } else {
-            return true;
-          }
-        }
+    List<String> mergedSegments = new ArrayList<>();
+    for (LoadMetadataDetails loadMetadataDetail : loadMetaDataDetails) {
+      if (null != loadMetadataDetail.getMergedLoadName() && loadMetadataDetail.getMergedLoadName()
+          .equalsIgnoreCase(segmentId)) {
+        mergedSegments.add(loadMetadataDetail.getLoadName());
       }
     }
+    if (!mergedSegments.isEmpty() && segmentIds.containsAll(mergedSegments)) {
+      isToBeLoadedAgain = false;
+    }
     return isToBeLoadedAgain;
   }
 
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
index 31b41f1..3f07cda 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/MVIncrementalLoadingTestcase.scala
@@ -567,6 +567,33 @@ class MVIncrementalLoadingTestcase extends QueryTest with BeforeAndAfterAll {
     assert(segmentList.containsAll(segmentMap.get("default.test_table")))
   }
 
+  test("test auto compaction with threshold") {
+    sql(s"drop table IF EXISTS test_table")
+    sql(
+      s"""
+         | CREATE TABLE test_table (empname String, designation String, doj Timestamp,
+         |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+         |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+         |  utilization int,salary int)
+         | STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('AUTO_LOAD_MERGE'='true','COMPACTION_LEVEL_THRESHOLD'='6,0')
+      """.stripMargin)
+    loadDataToFactTable("test_table")
+    sql("drop datamap if exists datamap1")
+    sql("create datamap datamap_com using 'mv' as select empname, designation from test_table")
+    for (i <- 0 to 4) {
+      loadDataToFactTable("test_table")
+    }
+    createTableFactTable("test_table1")
+    for (i <- 0 to 5) {
+      loadDataToFactTable("test_table1")
+    }
+    checkAnswer(sql("select empname, designation from test_table"),
+      sql("select empname, designation from test_table1"))
+    val df = sql(s""" select empname, designation from test_table""".stripMargin)
+    val analyzed = df.queryExecution.analyzed
+    assert(TestUtil.verifyMVDataMap(analyzed, "datamap_com"))
+  }
+
   override def afterAll(): Unit = {
     sql("drop table if exists products")
     sql("drop table if exists sales")
diff --git a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
index 5f0a490..6f5a8e7 100644
--- a/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
+++ b/datamap/mv/core/src/test/scala/org/apache/carbondata/mv/rewrite/TestAllOperationsOnMV.scala
@@ -509,5 +509,16 @@ class TestAllOperationsOnMV extends QueryTest with BeforeAndAfterEach {
     }.getMessage.contains("Cannot set SORT_COLUMNS as empty when SORT_SCOPE is LOCAL_SORT")
   }
 
+  test("test delete on datamap table") {
+    sql("drop table IF EXISTS maintable")
+    sql("create table maintable(name string, c_code int, price int) stored by 'carbondata' tblproperties('sort_scope'='no_sort','sort_columns'='name', 'inverted_index'='name')")
+    sql("insert into table maintable select 'abc',21,2000")
+    sql("create datamap dm_mv on table maintable using 'mv' as select name, sum(price) from maintable group by name")
+    intercept[UnsupportedOperationException] {
+      sql("delete from dm_mv_table where maintable_name='abc'")
+    }.getMessage.contains("Delete operation is not supported for datamap table")
+    sql("drop table IF EXISTS maintable")
+  }
+
 }
 
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
index 96b6000..6344ad1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonAnalysisRules.scala
@@ -202,14 +202,9 @@ case class CarbonIUDAnalysisRule(sparkSession: SparkSession) extends Rule[Logica
         val projList = Seq(UnresolvedAlias(UnresolvedStar(alias.map(Seq(_)))), tupleId)
         val carbonTable = CarbonEnv.getCarbonTable(table.tableIdentifier)(sparkSession)
         if (carbonTable != null) {
-          if (CarbonUtil.hasAggregationDataMap(carbonTable)) {
+          if (carbonTable.isChildTable) {
             throw new UnsupportedOperationException(
-              "Delete operation is not supported for tables which have a pre-aggregate table. " +
-              "Drop pre-aggregate tables to continue.")
-          }
-          if (carbonTable.isChildDataMap) {
-            throw new UnsupportedOperationException(
-              "Delete operation is not supported for pre-aggregate table")
+              "Delete operation is not supported for datamap table")
           }
           val indexSchemas = DataMapStoreManager.getInstance().getDataMapSchemasOfTable(carbonTable)
           if (DataMapUtil.hasMVDataMap(carbonTable)) {