You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/07 11:19:16 UTC

[carbondata] branch master updated: [CARBONDATA-3873] Secondary index compaction with maintable clean files causing exception

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bfbc01  [CARBONDATA-3873] Secondary index compaction with maintable clean files causing exception
2bfbc01 is described below

commit 2bfbc01ce8251ba498611ce39fcd0ef8df69b311
Author: Mahesh Raju Somalaraju <ma...@huawei.com>
AuthorDate: Fri Jun 26 16:38:00 2020 +0530

    [CARBONDATA-3873] Secondary index compaction with maintable clean files causing exception
    
    Why is this PR needed?
    Compaction with secondary index and cleaning files of the main table after compaction fails then we are
    getting Exception.
    If any compaction is failed for one of SI and if it is not disable then it will be considered for query
    execution. When query executed on SI table which is failed in compaction and it doesn't have correct data
    hence received Exception.
    
    What changes were proposed in this PR?
    1. If any compaction is failed then disable all SI which are successful. They will be enabled after the next load.
    2. Changing Error to the warning in table creation flow as it is a success case we should not give ERROR.
    
    This closes #3808
---
 .../core/datastore/filesystem/AbstractDFSCarbonFile.java   |  2 +-
 .../apache/spark/sql/secondaryindex/load/Compactor.scala   | 14 +++++++++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
index c2d81db..7e12dc9 100644
--- a/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
+++ b/core/src/main/java/org/apache/carbondata/core/datastore/filesystem/AbstractDFSCarbonFile.java
@@ -455,7 +455,7 @@ public abstract class AbstractDFSCarbonFile implements CarbonFile {
     try {
       listStatus = fileSystem.listStatus(path);
     } catch (IOException e) {
-      LOGGER.error("Exception occured: " + e.getMessage(), e);
+      LOGGER.warn("Exception occured: " + e.getMessage(), e);
       return new CarbonFile[0];
     }
     return getFiles(listStatus);
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
index a51aca1..e1f009d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/Compactor.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.secondaryindex.util.SecondaryIndexUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
@@ -47,6 +48,8 @@ object Compactor {
       forceAccessSegment: Boolean = false): Unit = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
     val carbonMainTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    var siCompactionIndexList = scala.collection.immutable.List[CarbonTable]()
+    val sparkSession = sqlContext.sparkSession
     // get list from carbonTable.getIndexes method
     val indexProviderMap = carbonMainTable.getIndexesMap
     if (null == indexProviderMap) {
@@ -121,10 +124,19 @@ object Compactor {
           segmentIdToLoadStartTimeMapping(validSegments.head),
           SegmentStatus.SUCCESS,
           carbonLoadModelForMergeDataFiles.getFactTimeStamp, rebuiltSegments.toList.asJava)
-
+        siCompactionIndexList ::= indexCarbonTable
       } catch {
         case ex: Exception =>
           LOGGER.error(s"Compaction failed for SI table ${secondaryIndex.indexName}", ex)
+          // If any compaction is failed then make all SI disabled which are success.
+          // They will be enabled in next load
+          siCompactionIndexList.foreach { indexCarbonTable =>
+            sparkSession.sql(
+              s"""
+                 | ALTER TABLE ${carbonLoadModel.getDatabaseName}.${indexCarbonTable.getTableName}
+                 | SET SERDEPROPERTIES ('isSITableEnabled' = 'false')
+               """.stripMargin).collect()
+          }
           throw ex
       }
     }