You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/05/05 15:58:33 UTC

[carbondata] branch master updated: [CARBONDATA-3790] Fix SI table consistency with main table segments

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 02efddc  [CARBONDATA-3790] Fix SI table consistency with main table segments
02efddc is described below

commit 02efddcb0c1f514e74ccab7bc6204c93e551a0f0
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Thu Apr 30 14:41:10 2020 +0530

    [CARBONDATA-3790] Fix SI table consistency with main table segments
    
    Why is this PR needed?
    Consider a scenario when SI loading is happening after main tale load, then when taking segment load, we got an issue and we skipped assuming that the missed segments will be loaded in next load by SILoadEventListenerForFailedSegments, but the status of SI is still enabled. But SILoadEventListenerForFailedSegments will only load to skipped segments if the status is disabled which will lead to segment mismatch between main and SI table, which ay lead query failure or data mismatch.
    
    What changes were proposed in this PR?
    If it fails to take segment lock, during Si load, add to a skip list, if that is not empty, make the SI disable, so that SILoadEventListenerForFailedSegments will take care to load the missing ones in next load to the main table.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new test case added?
    No
    
    This closes #3734
---
 .../sql/secondaryindex/rdd/SecondaryIndexCreator.scala     | 14 ++++++++++++++
 1 file changed, 14 insertions(+)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index ec63065..04cfbb0 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -91,6 +91,7 @@ object SecondaryIndexCreator {
 
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
     val validSegments: java.util.List[String] = new util.ArrayList[String]()
+    val skippedSegments: java.util.List[String] = new util.ArrayList[String]()
     var validSegmentList = List.empty[String]
 
     try {
@@ -110,6 +111,7 @@ object SecondaryIndexCreator {
           // skipped segments load will be handled in SILoadEventListenerForFailedSegments
           validSegments.add(eachSegment)
         } else {
+          skippedSegments.add(eachSegment)
           LOGGER.error(s"Not able to acquire the segment lock for table" +
                        s" ${indexCarbonTable.getTableUniqueName} for segment: $eachSegment. " +
                        s"Skipping this segment from loading.")
@@ -321,6 +323,18 @@ object SecondaryIndexCreator {
       segmentLocks.foreach(segmentLock => {
         segmentLock.unlock()
       })
+      // if some segments are skipped, disable the SI table so that
+      // SILoadEventListenerForFailedSegments will take care to load to these segments in next
+      // consecutive load to main table.
+      if (!skippedSegments.isEmpty) {
+        secondaryIndexModel.sqlContext.sparkSession.sql(
+          s"""ALTER TABLE ${
+            secondaryIndexModel
+              .carbonLoadModel
+              .getDatabaseName
+          }.${ secondaryIndexModel.secondaryIndex.indexName } SET
+             |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin).collect()
+      }
       try {
         if (!isCompactionCall) {
           SegmentStatusManager