You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/07/08 07:26:56 UTC

[carbondata] branch master updated: [CARBONDATA-3874] segment mismatch between maintable and SI table when load with concurrency

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 797963f  [CARBONDATA-3874] segment mismatch between maintable and SI table when load with concurrency
797963f is described below

commit 797963f51038bec8f1cd8cbc9a94d9f39e335bb0
Author: Mahesh Raju Somalaraju <ma...@huawei.com>
AuthorDate: Fri Jun 26 21:47:50 2020 +0530

    [CARBONDATA-3874] segment mismatch between maintable and SI table when load with concurrency
    
    Why is this PR needed?
    1. segment mismatch between main table and SI table with concurrency loads
    2. In concurrent loads, if one of the load failed for SI table then 'isSITableEnabled' will be
    disabled(isSITableEnabled = false). So in failed SI event listener case we are just checking SI
    enabled is true(isSITableEnabled == true) then we are not loading current load to SI table. In
    concurrent scenarios, this might be happening as SI enabled state is true but segment difference may exist.
    
    What changes were proposed in this PR?
    So instead of checking just SI enabled is true(isSITableEnabled == true) we should also check if any
    segment difference between main table and SI table. The final output flag checking will be as follows.
    if ( isSITableEnabled == true || mainTblAndSidiff == true ) { --- }
    
    This closes #3811
---
 .../secondaryindex/command/SICreationCommand.scala |  8 ++++-
 .../SILoadEventListenerForFailedSegments.scala     | 34 +++++++++++++++++-----
 .../load/CarbonInternalLoaderUtil.java             | 32 ++++++++++++++++----
 3 files changed, 60 insertions(+), 14 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
index be0c614..2e979d6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/command/SICreationCommand.scala
@@ -49,6 +49,7 @@ import org.apache.carbondata.core.metadata.schema.indextable.{IndexMetadata, Ind
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo, TableSchema}
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{CreateTablePostExecutionEvent, CreateTablePreExecutionEvent, OperationContext, OperationListenerBus}
@@ -390,8 +391,13 @@ private[sql] case class CarbonCreateSecondaryIndexCommand(
       }
       val indexTablePath = CarbonTablePath
         .getMetadataPath(tableInfo.getOrCreateAbsoluteTableIdentifier.getTablePath)
+      val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+      val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+        SegmentStatusManager.readLoadMetadata(indexTablePath)
       val isMaintableSegEqualToSISegs = CarbonInternalLoaderUtil
-        .checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath, indexTablePath)
+        .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+          siTblLoadMetadataDetails)
       if (isMaintableSegEqualToSISegs) {
         // enable the SI table
         sparkSession.sql(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2071385..74f2f53 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -79,8 +79,20 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                   .getTableMetadata(TableIdentifier(indexTableName,
                     Some(carbonLoadModel.getDatabaseName))).storage.properties
                   .getOrElse("isSITableEnabled", "true").toBoolean
+                val indexTable = metaStore
+                  .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+                    sparkSession)
+                  .asInstanceOf[CarbonRelation]
+                  .carbonTable
 
-                if (!isLoadSIForFailedSegments) {
+                val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                  SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+                val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                  SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+                if (!isLoadSIForFailedSegments
+                    || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+                  mainTblLoadMetadataDetails,
+                  siTblLoadMetadataDetails)) {
                   val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
                     indexTableName)
                   val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
@@ -88,11 +100,6 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                     indexColumns.split(",").toList,
                     indexTableName)
 
-                  val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-                  val indexTable = metaStore
-                    .lookupRelation(Some(carbonLoadModel.getDatabaseName),
-                      indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-
                   var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
                   // If it empty, then no need to do further computations because the
                   // tabletstatus might not have been created and hence next load will take care
@@ -162,11 +169,22 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
                       // get the current load metadata details of the index table
                       details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
                     }
+
+                    // get updated main table segments and si table segments
+                    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+                    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+                      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+                    // check if main table has load in progress and SI table has no load
+                    // in progress entry, then no need to enable the SI table
                     // Only if the valid segments of maintable match the valid segments of SI
                     // table then we can enable the SI for query
                     if (CarbonInternalLoaderUtil
-                      .checkMainTableSegEqualToSISeg(carbonTable.getMetadataPath,
-                        indexTable.getMetadataPath)) {
+                          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+                            siTblLoadMetadataDetails)
+                        && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+                      mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
                       // enable the SI table if it was disabled earlier due to failure during SI
                       // creation time
                       sparkSession.sql(
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index 6d6ad28..b4066fc 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -304,14 +304,13 @@ public class CarbonInternalLoaderUtil {
 
   /**
    * Method to check if main table and SI have same number of valid segments or not
-   *
    */
-  public static boolean checkMainTableSegEqualToSISeg(String carbonTablePath,
-      String indexTablePath) {
+  public static boolean checkMainTableSegEqualToSISeg(LoadMetadataDetails[] mainTableLoadMetadataDetails,
+                                                      LoadMetadataDetails[] siTableLoadMetadataDetails) {
     List<String> mainTableSegmentsList =
-        getListOfValidSlices(SegmentStatusManager.readCarbonMetaData(carbonTablePath));
+        getListOfValidSlices(mainTableLoadMetadataDetails);
     List<String> indexList =
-        getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTablePath));
+        getListOfValidSlices(siTableLoadMetadataDetails);
     Collections.sort(mainTableSegmentsList);
     Collections.sort(indexList);
     if (indexList.size() != mainTableSegmentsList.size()) {
@@ -325,4 +324,27 @@ public class CarbonInternalLoaderUtil {
     return true;
   }
 
+  /**
+   * Method to check if main table has in progress load and same segment not present in SI
+   */
+  public static boolean checkInProgLoadInMainTableAndSI(CarbonTable carbonTable,
+                                                      LoadMetadataDetails[] mainTableLoadMetadataDetails,
+                                                      LoadMetadataDetails[] siTableLoadMetadataDetails) {
+    List<String> allSiSlices = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+    for (LoadMetadataDetails oneLoad : siTableLoadMetadataDetails) {
+      allSiSlices.add(oneLoad.getLoadName());
+    }
+
+    if (mainTableLoadMetadataDetails.length != 0) {
+      for (LoadMetadataDetails loadDetail : mainTableLoadMetadataDetails) {
+        // if load in progress and check if same load is present in SI.
+        if (SegmentStatusManager.isLoadInProgress(carbonTable.getAbsoluteTableIdentifier(), loadDetail.getLoadName())) {
+          if (!allSiSlices.contains(loadDetail.getLoadName())) {
+            return false;
+          }
+        }
+      }
+    }
+    return true;
+  }
 }