You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/12/28 16:10:29 UTC

[GitHub] [carbondata] Indhumathi27 opened a new pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Indhumathi27 opened a new pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067


    ### Why is this PR needed?
    
    
    ### What changes were proposed in this PR?
   
       
    ### Does this PR introduce any user interface change?
    - No
    - Yes. (please explain the change and update document)
   
    ### Is any new testcase added?
    - No
    - Yes
   
       
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549715981



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,

Review comment:
       I dont think this has to be concurrent exception, it can throw only IO exception, in that case just throw the same exception. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549729189



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+                carbonTable.getTableName, "table status read", "reindex command")
+            }
+            return;

Review comment:
       No need




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549630875



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       if possible, clean up those commented readLoadMetadetails function calls in the same method.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752145672


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3500/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752069900


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3498/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549618492



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       why not use default carbonLock.lockWithRetries() ?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName)

Review comment:
       After your change, compaction happening in one thread and concurrently load or repair SI is called, repair cannot be successful as you don't consider main table segments as it is under compaction. 
   
   so add a log here that "Didn't check failed segments for SI as main table is under compaction, please call SI repair again"

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       I can see for repair itself, 3 times main table status and 2 times SI table status is read.  please optimize.
   
   Also when we compare main tablestatus and SI tablestatus, it is good to acquire lock for both main and SI table status. Else if maintable status is read and some other thread can modify SI table status and main table status, which can cause wrong result in comparison.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(

Review comment:
       please rename to compactionLock, so while reading its usage code down, it will be easy to know what lock it is




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044968


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5258/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940488


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5252/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751966472


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3493/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549728719



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+                carbonTable.getTableName, "table status read", "reindex command")
+            }
+            return;

Review comment:
       already a LOG is present  in readTableStatus file. still need to add ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752044813


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3497/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752070020


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5259/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751821307


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3490/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549669980



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670126



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable

Review comment:
       modified




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] Indhumathi27 commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
Indhumathi27 commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549670016



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {
+          val newMainTableDetails = SegmentStatusManager.readLoadMetadata(carbonTable
+            .getMetadataPath)
+          // check for the skipped segments. compare the main table and SI table table
+          // status file and get the skipped segments if any
+          CarbonInternalLoaderUtil.getListOfValidSlices(newMainTableDetails).asScala
+            .foreach(metadataDetail => {
+              if (repairLimit > failedLoadMetadataDetails.size()) {
+                val detail = siTblLoadMetadataDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                val mainTableDetail = newMainTableDetails
+                  .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+                if (null == detail || detail.length == 0) {
+                  val newDetails = new LoadMetadataDetails
+                  newDetails.setLoadName(metadataDetail)
+                  LOGGER.error(
+                    "Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+                    " table " + indexTableName + "." + carbonTable.getTableName)
+                  failedLoadMetadataDetails.add(newDetails)
+                } else if (detail != null && detail.length != 0 && metadataDetail != null
+                           && metadataDetail.length != 0) {
+                  // If SI table has compacted segments and main table does not have
+                  // compacted segments due to some failure while compaction, need to
+                  // reload the original segments in this case.
+                  if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+                      mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+                    detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+                    // in concurrent scenario, if a compaction is going on table, then SI
+                    // segments are updated first in table status and then the main table
+                    // segment, so in any load runs parallel this listener shouldn't consider
+                    // those segments accidentally. So try to take the segment lock.
+                    val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                      .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                        CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                        LockUsage.LOCK)
+                    if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                      segmentLocks += segmentLockOfProbableOnCompactionSeg
+                      LOGGER.error(
+                        "Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
+                        + "table " + indexTableName + "." + carbonTable.getTableName)
+                      failedLoadMetadataDetails.add(detail(0))
+                    }
+                  }
                 }
               }
-            }
-          }
-        })
+            })
+        } else {
+          LOGGER.error(
+            "Unable to obtain compaction lock for table" + carbonTable.getTableUniqueName)

Review comment:
       done

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -462,47 +462,67 @@ object CarbonIndexUtil {
             }
           }
       }
-      // check for the skipped segments. compare the main table and SI table table
-      // status file and get the skipped segments if any
-      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
-        .foreach(metadataDetail => {
-          if (repairLimit > failedLoadMetadataDetails.size()) {
-            val detail = siTblLoadMetadataDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            val mainTableDetail = mainTableDetails
-              .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-            if (null == detail || detail.length == 0) {
-              val newDetails = new LoadMetadataDetails
-              newDetails.setLoadName(metadataDetail)
-              LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
-                " table " + indexTableName + "." + carbonTable.getTableName)
-              failedLoadMetadataDetails.add(newDetails)
-            } else if (detail != null && detail.length != 0 && metadataDetail != null
-              && metadataDetail.length != 0) {
-              // If SI table has compacted segments and main table does not have
-              // compacted segments due to some failure while compaction, need to
-              // reload the original segments in this case.
-              if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                // in concurrent scenario, if a compaction is going on table, then SI
-                // segments are updated first in table status and then the main table
-                // segment, so in any load runs parallel this listener shouldn't consider
-                // those segments accidentally. So try to take the segment lock.
-                val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                  .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                    CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                      LockUsage.LOCK)
-                if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                  segmentLocks += segmentLockOfProbableOnCompactionSeg
-                  LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI "
-                    + "table " + indexTableName + "." + carbonTable.getTableName)
-                  failedLoadMetadataDetails.add(detail(0))
+      val carbonLock = CarbonLockFactory.getCarbonLockObj(
+        carbonTable.getAbsoluteTableIdentifier,
+        LockUsage.COMPACTION_LOCK)
+      try {
+        // In some cases, SI table segment might be in COMPACTED state and main table
+        // compaction might be still in progress. In those cases, we can try to take compaction lock
+        // on main table and then compare and add SI segments to failedLoads, to avoid repair
+        // SI SUCCESS loads.
+        if (carbonLock.lockWithRetries(3, 0)) {

Review comment:
       done




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752106076


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5260/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752123149


   LGTM. 
   
   However `processSIRepair` (base code for this PR) can be optimized further in other PR. multiple time we are reading table status and comparing main table and SI table again again. can be done in one loop.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
ajantha-bhat commented on a change in pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#discussion_r549716227



##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
##########
@@ -406,158 +407,195 @@ object CarbonIndexUtil {
       .asInstanceOf[CarbonRelation]
       .carbonTable
 
-    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
     var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-      mainTableDetails.toArray,
-      siTblLoadMetadataDetails)) {
-      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-        indexTableName)
-      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
-        indexMetadata.getParentTableName,
-        indexColumns.split(",").toList,
-        indexTableName)
-
-      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-      // If it empty, then no need to do further computations because the
-      // tabletstatus might not have been created and hence next load will take care
-      if (siTblLoadMetadataDetails.isEmpty) {
-        Seq.empty
-      }
+    val compactionLock = CarbonLockFactory.getCarbonLockObj(
+      carbonTable.getAbsoluteTableIdentifier,
+      LockUsage.COMPACTION_LOCK)
+    try {
+      // In some cases, SI table segment might be in COMPACTED state and main table
+      // compaction might be still in progress. In those cases, we can try to take compaction lock
+      // on main table and then compare and add SI segments to failedLoads, to avoid repair
+      // SI SUCCESS loads.
+      if (compactionLock.lockWithRetries()) {
+        var mainTableDetails = try {
+          SegmentStatusManager.readTableStatusFile(CarbonTablePath.getTableStatusFilePath(
+            carbonTable.getTablePath))
+        } catch {
+          case _: Exception =>
+            if (!isLoadOrCompaction) {
+              throw new ConcurrentOperationException(carbonTable.getDatabaseName,
+                carbonTable.getTableName, "table status read", "reindex command")
+            }
+            return;

Review comment:
       else case better to add a error log




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751940591


   Build Failed  with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3491/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996416


   Build Success with Spark 2.4.5, Please check CI http://121.244.95.60:12444/job/ApacheCarbon_PR_Builder_2.4.5/3496/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-752143730


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5261/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751816234


   Build Failed  with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5251/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [WIP] Fix concurrency(Load&Compaction) issue with SI

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751965233


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5254/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] asfgit closed pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
asfgit closed pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4067: [CARBONDATA-4100] Fix SI segments are in inconsistent state with maintable after concurrent(Load&Compaction) operation

Posted by GitBox <gi...@apache.org>.
CarbonDataQA2 commented on pull request #4067:
URL: https://github.com/apache/carbondata/pull/4067#issuecomment-751996475


   Build Success with Spark 2.3.4, Please check CI http://121.244.95.60:12444/job/ApacheCarbonPRBuilder2.3/5257/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org