You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/10 17:28:57 UTC

[carbondata] branch master updated: [CARBONDATA-3885] [CARBONDATA-3884] Fix for load failures with isSIenabled = false and fix for concurrent load failure

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new d4b0709  [CARBONDATA-3885] [CARBONDATA-3884] Fix for load failures with isSIenabled = false and fix for concurrent load failure
d4b0709 is described below

commit d4b0709cf8c4f20ad61114d69022f15663b84915
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Wed Jun 24 12:16:42 2020 +0530

    [CARBONDATA-3885] [CARBONDATA-3884] Fix for load failures with isSIenabled = false and fix for concurrent load failure
    
    Why is this PR needed?
    Issue 1:
    In some concurrent scenarios it was found that certain segment folders were getting deleted for the SI table. When
    the segment was being inserted again back to the SI table during a new load to the main table by doing isSIenabled = false,
    the load to the SI table failed.
    
    Root Cause: It was found that when the segment folder gets deleted and it's entry from table status file is manually
    deleted then, during new load to SI when table status is changed to InsertInProgress during the loading previously
    deleted segments the stale .segmentfile still exists which was pointing to the mergeindexfile which does exist
    anymore(since the segment folder is deleted). Since that mergeindexfile does not exist anymore the load to SI table failed
    
    Issue 2:
    In some concurrent load scenarios in main table with SI table with isSITableEnabled = false , load to SI table fails.
    
    Root Cause: When concurrent loading is done with isSITableEnabled = false, maintable details are taken earlier in the
    code and SItable details are taken later. It was found that there were some scenarios where another concurrent load to
    main table finishes and can change the number of segments in the main table due to which there is mismatch in number
    of segments in the SI table and in the main table. Due to this mismatch when the validmaintableload method is called,
    there can be scenarios where the load fails.
    
    What changes were proposed in this PR?
    Solution for Issue 1: Whenever isSIenabled is set to false and it tries to load previously deleted SI segments to the
    SI table we go and delete the stale .segment file from the metadata folder if it exists. Due to this, while loading a
    new .segment file will be created thus having a new index file path.
    
    Solution for Issue 2: Whenever isSIenabled is set to false and it tries to load previously deleted SI segments,
    just before adding segments in SIFailedSegmentList checking again if the loadname is present in the main table. Only
    adding in SIFailedSegmenList if the loadname is present in the main table and it's in SUCCESS state.
    
    This closes #3802
---
 .../SILoadEventListenerForFailedSegments.scala     | 57 +++++++++++++++++++---
 .../load/CarbonInternalLoaderUtil.java             | 33 ++++++++++++-
 2 files changed, 82 insertions(+), 8 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 74f2f53..cae33ed 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -32,13 +32,15 @@ import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{Event, OperationContext, OperationEventListener}
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 /**
  * This Listener is to load the data to failed segments of Secondary index table(s)
@@ -149,15 +151,35 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                     .foreach(metadataDetail => {
                       val detail = details
                         .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                      val mainTableDetail = mainTableDetails
+                        .filter(metadata => metadata.getLoadName.equals(metadataDetail))
                       if (null == detail || detail.length == 0) {
                         val newDetails = new LoadMetadataDetails
                         newDetails.setLoadName(metadataDetail)
                         LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName)
                         failedLoadMetadataDetails.add(newDetails)
+                      } else if (detail != null && detail.length !=0 && metadataDetail != null
+                                 && metadataDetail.length != 0) {
+                        // If SI table has compacted segments and main table does not have
+                        // compacted segments due to some failure while compaction, need to
+                        // reload the original segments in this case.
+                        if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                            mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                          detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                          LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
+                          failedLoadMetadataDetails.add(detail(0))
+                        }
                       }
                     })
                   try {
                     if (!failedLoadMetadataDetails.isEmpty) {
+                      // in the case when in SI table a segment is deleted and it's entry is
+                      // deleted from the tablestatus file, the corresponding .segment file from
+                      // the metadata folder should also be deleted as it contains the
+                      // mergefilename which does not exist anymore as the segment is deleted.
+                      deleteStaleSegmentFileIfPresent(carbonLoadModel,
+                        indexTable,
+                        failedLoadMetadataDetails)
                       CarbonIndexUtil
                         .LoadToSITable(sparkSession,
                           carbonLoadModel,
@@ -209,14 +231,37 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
 
   def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
     loadName: String): Boolean = {
+    // in concurrent scenarios there can be cases when loadName is not present in the
+    // mainTableDetails array. Added a check to see if the loadName is even present in the
+    // mainTableDetails.
     val mainTableLoadDetail = mainTableDetails
-      .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName)).head
-    if (mainTableLoadDetail.getSegmentStatus ==
-        SegmentStatus.MARKED_FOR_DELETE ||
-        mainTableLoadDetail.getSegmentStatus == SegmentStatus.COMPACTED) {
+      .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
+    if (mainTableLoadDetail.length == 0) {
       false
     } else {
-      true
+      if (mainTableLoadDetail.head.getSegmentStatus ==
+        SegmentStatus.MARKED_FOR_DELETE ||
+        mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
+        false
+      } else {
+        true
+      }
     }
   }
+
+  def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, indexTable: CarbonTable,
+    failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
+    failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
+      carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
+        if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
+          val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
+            CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
+          if (FileFactory.isFileExist(segmentFilePath)) {
+            // delete the file if it exists
+            FileFactory.deleteFile(segmentFilePath)
+          }
+        }
+      })
+    })
+  }
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index b4066fc..9a16c4a 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 
@@ -119,6 +120,24 @@ public class CarbonInternalLoaderUtil {
           updatedLoadMetadataDetails.add(currentLoadMetadataDetails[i]);
         }
 
+        // check if newLoadMetadataDetail has segments which are not in  currentLoadMetaDetails
+        // and add them to the updatedLoadMetadataDetails
+        boolean foundNext = false;
+        for (int i = 0; i < newLoadMetadataDetails.size(); i++) {
+          foundNext = false;
+          for (int j = 0; j < currentLoadMetadataDetails.length; j++) {
+            if (newLoadMetadataDetails.get(i).getLoadName().equals(currentLoadMetadataDetails[j].getLoadName())) {
+              foundNext = true;
+              break;
+            }
+            if (j == currentLoadMetadataDetails.length - 1 && !foundNext) {
+              // if not found in the list then add it
+              updatedLoadMetadataDetails.add(newLoadMetadataDetails.get(i));
+              found = true;
+            }
+          }
+        }
+
         // when data load is done for first time, add all the details
         if (currentLoadMetadataDetails.length == 0 || !found) {
           updatedLoadMetadataDetails.addAll(newLoadMetadataDetails);
@@ -313,11 +332,21 @@ public class CarbonInternalLoaderUtil {
         getListOfValidSlices(siTableLoadMetadataDetails);
     Collections.sort(mainTableSegmentsList);
     Collections.sort(indexList);
-    if (indexList.size() != mainTableSegmentsList.size()) {
+    // In the case when number of SI segments are more than the maintable segments do nothing
+    // and proceed to process the segments. Return False in case if maintable segments are more
+    // than SI Segments
+    if (indexList.size() < mainTableSegmentsList.size()) {
       return false;
     }
+    // There can be cases when the number of segments in the main table are less than the index
+    // table. In this case mapping all the segments in main table to SI table.
+    // Return False if a segment in maintable is not in indextable
+    HashSet<String> indexTableSet = new HashSet<String>();
     for (int i = 0; i < indexList.size(); i++) {
-      if (!indexList.get(i).equalsIgnoreCase(mainTableSegmentsList.get(i))) {
+      indexTableSet.add(indexList.get(i));
+    }
+    for (int i = 0; i < mainTableSegmentsList.size(); i++) {
+      if (!indexTableSet.contains(mainTableSegmentsList.get(i))) {
         return false;
       }
     }