You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/01 11:39:09 UTC

[carbondata] branch master updated: [CARBONDATA-3956] Reindex command on SI table

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 0afbef1  [CARBONDATA-3956] Reindex command on SI table
0afbef1 is described below

commit 0afbef198d9142acec153f7da7f613ce7bfc25db
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Wed Jul 29 18:01:04 2020 +0530

    [CARBONDATA-3956] Reindex command on SI table
    
    Why is this PR needed?
    In the main table with SI tables after every
    load/insert command , SILoadEventListenerForFailedSegments.scala
    checks for missing segments or segments mismatch in SI table
    and loads the missing/deleted segments to the SI table.
    This functionality is dependent on the load/insert command.
    A new functionality/command is required so that this repair
    logic can be made independent to the load/insert logic.
    
    What changes were proposed in this PR?
    Added a separate SQL reindex command(reindex [index_table]
    on table maintable) to call the SI repair logic without load/insert.
    
    This closes #3873
---
 docs/index/secondary-index-guide.md                |  23 +-
 .../testsuite/secondaryindex/TestIndexRepair.scala | 253 ++++++++++++++++++++
 .../spark/sql/catalyst/CarbonDDLSqlParser.scala    |   1 +
 .../command/index/IndexRepairCommand.scala         | 134 +++++++++++
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 214 ++++++++++++++++-
 .../spark/sql/parser/CarbonSpark2SqlParser.scala   |  35 ++-
 .../SILoadEventListenerForFailedSegments.scala     | 258 +++------------------
 7 files changed, 679 insertions(+), 239 deletions(-)

diff --git a/docs/index/secondary-index-guide.md b/docs/index/secondary-index-guide.md
index cf22097..2eaaed9 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -188,4 +188,25 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
-  ```
\ No newline at end of file
+  ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes of the main table
+  ```
+  REINDEX ON TABLE [db_name.]main_table_name [WHERE SEGMENT.ID IN(0,1)]
+  ```
+Reindexing at index table level
+
+  ```
+  REINDEX INDEX TABLE index_table ON [db_name.]main_table_name [WHERE SEGMENT.ID IN (1)]
+  ```
+Reindex on Database level
+  ```
+  REINDEX DATABASE db_name [WHERE SEGMENT.ID IN (1,2,5)]
+  ```
+Note: This command is not supported with other concurrent operations.
\ No newline at end of file
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
new file mode 100644
index 0000000..1a041ad
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing reindex command on index table/main table/DB level
+ */
+class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists maintable")
+    sql("drop table if exists indextable1")
+    sql("drop table if exists indextable2")
+  }
+
+  test("reindex command after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql(s"""ALTER TABLE default.indextable1 SET
+           |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+    val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(!isFilterPushedDownToSI(df1))
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
+    val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)
+    assert(isFilterPushedDownToSI(df2))
+    sql("drop table if exists maintable")
+  }
+
+
+  test("reindex command after deleting segments from SI table on other database without use") {
+    sql("drop table if exists test.maintable")
+    sql("drop database if exists test cascade")
+    sql("create database test")
+    sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+    sql(s"""ALTER TABLE test.indextable1 SET
+           |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+    val df1 = sql("select * from test.maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(!isFilterPushedDownToSI(df1))
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE")
+    val df2 = sql("select * from test.maintable where c = 'string2'").queryExecution.sparkPlan
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)
+    assert(isFilterPushedDownToSI(df2))
+    sql("drop table if exists test.maintable")
+    sql("drop database if exists test cascade")
+  }
+
+  test("reindex command using segment.id after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql(s"""ALTER TABLE default.indextable1 SET
+           |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+    val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(!isFilterPushedDownToSI(df1))
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (0,1)")
+    val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(postDeleteSegments + 2 == postFirstRepair)
+    val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(!isFilterPushedDownToSI(df2))
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)")
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)
+    val df3 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(isFilterPushedDownToSI(df3))
+    sql("drop table if exists maintable")
+  }
+
+  test("insert command after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(1,2,3)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql(s"""ALTER TABLE default.indextable1 SET
+           |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+    val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(!isFilterPushedDownToSI(df1))
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    val postLoadSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments + 1 == postLoadSegments)
+    val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+    assert(isFilterPushedDownToSI(df2))
+    sql("drop table if exists maintable")
+  }
+
+  test("reindex command on main table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("CREATE INDEX indextable2 on table maintable(d) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+    val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+    assert(preDeleteSegments!=postDeleteSegmentsIndexOne)
+    assert(preDeleteSegments!=postDeleteSegmentsIndexTwo)
+    sql("REINDEX ON TABLE MAINTABLE")
+    val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+    assert(preDeleteSegments == postRepairSegmentsIndexOne)
+    assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+    sql("drop table if exists maintable")
+  }
+
+  test("reindex command on main table with delete command") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("CREATE INDEX indextable2 on table maintable(d) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+    val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+    assert(preDeleteSegments != postDeleteSegmentsIndexOne)
+    assert(preDeleteSegments != postDeleteSegmentsIndexTwo)
+    sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
+    val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+    assert(preDeleteSegments == postRepairSegmentsIndexOne)
+    assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+    sql("drop table if exists maintable")
+  }
+
+
+    test("reindex command on database") {
+    sql("drop database if exists test cascade")
+    sql("create database test")
+    sql("drop table if exists maintable1")
+
+    //table 1
+    sql("CREATE TABLE test.maintable1(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table test.maintable1(c) as 'carbondata'")
+    sql("CREATE INDEX indextable2 on table test.maintable1(d) as 'carbondata'")
+    sql("INSERT INTO test.maintable1 SELECT 1,'string1', 'string2', 'string3'")
+    sql("INSERT INTO test.maintable1 SELECT 1,'string1', 'string2', 'string3'")
+
+    val preDeleteSegmentsTableOne = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE1").count()
+    sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+    sql("DELETE FROM TABLE test.INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE2")
+    val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE2").count()
+
+    // table 2
+    sql("CREATE TABLE test.maintable2(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+    sql("CREATE INDEX indextable3 on table test.maintable2(c) as 'carbondata'")
+    sql("CREATE INDEX indextable4 on table test.maintable2(d) as 'carbondata'")
+    sql("INSERT INTO test.maintable2 SELECT 1,'string1', 'string2', 'string3'")
+    sql("INSERT INTO test.maintable2 SELECT 1,'string1', 'string2', 'string3'")
+
+    val preDeleteSegmentsTableTwo = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE2").count()
+    sql("DELETE FROM TABLE test.INDEXTABLE3 WHERE SEGMENT.ID IN(1)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE3")
+    sql("DELETE FROM TABLE test.INDEXTABLE4 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE4")
+    val postDeleteSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE3").count()
+    val postDeleteSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE4").count()
+
+    assert(preDeleteSegmentsTableOne!=postDeleteSegmentsIndexOne)
+    assert(preDeleteSegmentsTableOne!=postDeleteSegmentsIndexTwo)
+    assert(preDeleteSegmentsTableTwo!=postDeleteSegmentsIndexThree)
+    assert(preDeleteSegmentsTableTwo!=postDeleteSegmentsIndexFour)
+    sql("REINDEX DATABASE TEST")
+    val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE2").count()
+    val postRepairSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE3").count()
+    val postRepairSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE4").count()
+    assert(preDeleteSegmentsTableOne == postRepairSegmentsIndexOne)
+    assert(preDeleteSegmentsTableOne == postRepairSegmentsIndexTwo)
+    assert(preDeleteSegmentsTableTwo == postRepairSegmentsIndexThree)
+    assert(preDeleteSegmentsTableTwo == postRepairSegmentsIndexFour)
+    sql("drop table if exists test.maintable1")
+    sql("drop table if exists test.maintable2")
+    sql("drop database if exists test cascade")
+  }
+
+
+  override def afterAll {
+    sql("drop table if exists maintable")
+    sql("drop table if exists indextable1")
+    sql("drop table if exists indextable2")
+  }
+
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9928f48..155e63d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -174,6 +174,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
   protected val INSERT = carbonKeyWord("INSERT")
   protected val STAGE = carbonKeyWord("STAGE")
   protected val INDEX = carbonKeyWord("INDEX")
+  protected val REINDEX = carbonKeyWord("REINDEX")
   protected val INDEXES = carbonKeyWord("INDEXES")
   protected val REGISTER = carbonKeyWord("REGISTER")
   protected val PROPERTIES = carbonKeyWord("PROPERTIES")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
new file mode 100644
index 0000000..6a3e4ec
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(
+  indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+  dbName: String,
+  segments: Option[List[String]])
+extends DataCommand {
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // dbName is null, repair for index table or all the index table in main table
+      val databaseName = if (tableIdentifier.database.isEmpty) {
+        sparkSession.sessionState.catalog.getCurrentDatabase
+      } else {
+        tableIdentifier.database.get
+      }
+      triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments, sparkSession)
+    } else {
+      // repairing si for all  index tables in the mentioned database in the repair command
+      sparkSession.sessionState.catalog.listTables(dbName).foreach {
+        tableIdent =>
+          triggerRepair(tableIdent.table, dbName, indexnameOp, segments, sparkSession)
+      }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableName: String, databaseName: String,
+    indexTableToRepair: Option[String], segments: Option[List[String]],
+    sparkSession: SparkSession): Unit = {
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val mainCarbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableName)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val tableStatusLock = CarbonLockFactory
+      .getCarbonLockObj(mainCarbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+      val carbonLoadModel = new CarbonLoadModel
+      carbonLoadModel.setDatabaseName(databaseName)
+      carbonLoadModel.setTableName(tableName)
+      carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+    try {
+      if (tableStatusLock.lockWithRetries()) {
+        val tableStatusFilePath = CarbonTablePath
+          .getTableStatusFilePath(mainCarbonTable.getTablePath)
+        carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+          .readTableStatusFile(tableStatusFilePath).toList.asJava)
+        carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+      } else {
+        throw new ConcurrentOperationException(mainCarbonTable.getDatabaseName,
+          mainCarbonTable.getTableName, "table status read", "reindex command")
+      }
+    } finally {
+      tableStatusLock.unlock()
+    }
+    val indexMetadata = mainCarbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList
+        } else {
+          // get segments for main table
+          carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+            loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+        }
+        if (indexTableToRepair.isEmpty) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          val indexTablesToRepair = indexTables.filter(indexTable => indexTable
+            .equals(indexTableToRepair.get))
+          indexTablesToRepair.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+          if (indexTablesToRepair.isEmpty) {
+            throw new Exception("Unable to find index table" + indexTableToRepair.get)
+          }
+        }
+      }
+    }
+  }
+
+  override protected def opName: String = "REINDEX COMMAND"
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 9bb7eb6..c2bb9cf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -21,11 +21,12 @@ import java.util
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
 
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonSessionCatalogUtil
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
 import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
 import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
 import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
@@ -35,13 +36,16 @@ import org.apache.spark.util.AlterTableUtil
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
 import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
 import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.format.TableInfo
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
@@ -376,4 +380,208 @@ object CarbonIndexUtil {
       AlterTableUtil.releaseLocks(locks.asScala.toList)
     }
   }
+
+  def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+    carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+      mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+  (sparkSession: SparkSession) : Unit = {
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val indexTable = metaStore
+      .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+        sparkSession)
+      .asInstanceOf[CarbonRelation]
+      .carbonTable
+
+    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+    var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+    if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+      mainTableDetails.toArray,
+      siTblLoadMetadataDetails)) {
+      val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+        indexTableName)
+      val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
+        indexMetadata.getParentTableName,
+        indexColumns.split(",").toList,
+        indexTableName)
+
+      // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+      // If it empty, then no need to do further computations because the
+      // tabletstatus might not have been created and hence next load will take care
+      if (siTblLoadMetadataDetails.isEmpty) {
+        Seq.empty
+      }
+
+      val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
+      .ArrayList[LoadMetadataDetails]()
+
+      // read the details of SI table and get all the failed segments during SI
+      // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+      siTblLoadMetadataDetails.foreach {
+        case loadMetaDetail: LoadMetadataDetails =>
+          if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
+            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
+              loadMetaDetail.getLoadName)) {
+            failedLoadMetadataDetails.add(loadMetaDetail)
+          } else if ((loadMetaDetail.getSegmentStatus ==
+            SegmentStatus.INSERT_IN_PROGRESS ||
+            loadMetaDetail.getSegmentStatus ==
+              SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+            checkIfMainTableLoadIsValid(mainTableDetails.toArray,
+              loadMetaDetail.getLoadName)) {
+            val segmentLock = CarbonLockFactory
+              .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
+                CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
+                  LockUsage.LOCK)
+            try {
+              if (segmentLock.lockWithRetries(1, 0)) {
+                LOGGER
+                  .info("SIFailedLoadListener: Acquired segment lock on segment:" +
+                    loadMetaDetail.getLoadName)
+                failedLoadMetadataDetails.add(loadMetaDetail)
+              }
+            } finally {
+              segmentLock.unlock()
+              LOGGER
+                .info("SIFailedLoadListener: Released segment lock on segment:" +
+                  loadMetaDetail.getLoadName)
+            }
+          }
+      }
+      // check for the skipped segments. compare the main table and SI table table
+      // status file and get the skipped segments if any
+      CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
+        .foreach(metadataDetail => {
+          val detail = siTblLoadMetadataDetails
+            .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+          val mainTableDetail = mainTableDetails
+            .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+          if (null == detail || detail.length == 0) {
+            val newDetails = new LoadMetadataDetails
+            newDetails.setLoadName(metadataDetail)
+            LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName  + " for SI" +
+              " table " + indexTableName + "." + carbonTable.getTableName )
+            failedLoadMetadataDetails.add(newDetails)
+          } else if (detail != null && detail.length != 0 && metadataDetail != null
+            && metadataDetail.length != 0) {
+            // If SI table has compacted segments and main table does not have
+            // compacted segments due to some failure while compaction, need to
+            // reload the original segments in this case.
+            if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+              mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+              detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+              // in concurrent scenario, if a compaction is going on table, then SI
+              // segments are updated first in table status and then the main table
+              // segment, so in any load runs parallel this listener shouldn't consider
+              // those segments accidentally. So try to take the segment lock.
+              val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+                .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+                  CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+                    LockUsage.LOCK)
+              if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+                segmentLocks += segmentLockOfProbableOnCompactionSeg
+                LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName  + " for SI " +
+                  "table " + indexTableName + "." + carbonTable.getTableName )
+                failedLoadMetadataDetails.add(detail(0))
+              }
+            }
+          }
+        })
+      try {
+        if (!failedLoadMetadataDetails.isEmpty) {
+          // in the case when in SI table a segment is deleted and it's entry is
+          // deleted from the tablestatus file, the corresponding .segment file from
+          // the metadata folder should also be deleted as it contains the
+          // mergefilename which does not exist anymore as the segment is deleted.
+          deleteStaleSegmentFileIfPresent(carbonLoadModel,
+            indexTable,
+            failedLoadMetadataDetails)
+          CarbonIndexUtil
+            .LoadToSITable(sparkSession,
+              carbonLoadModel,
+              indexTableName,
+              isLoadToFailedSISegments = true,
+              indexModel,
+              carbonTable, indexTable, failedLoadMetadataDetails)
+
+          // get the current load metadata details of the index table
+          // details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+        }
+
+        // get updated main table segments and si table segments
+        val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+          SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+        // check if main table has load in progress and SI table has no load
+        // in progress entry, then no need to enable the SI table
+        // Only if the valid segments of maintable match the valid segments of SI
+        // table then we can enable the SI for query
+        if (CarbonInternalLoaderUtil
+          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+            siTblLoadMetadataDetails)
+          && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+          mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
+          // enable the SI table if it was disabled earlier due to failure during SI
+          // creation time
+          sparkSession.sql(
+            s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
+               |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+        }
+      } catch {
+        case ex: Exception =>
+          // in case of SI load only for for failed segments, catch the exception, but
+          // do not fail the main table load, as main table segments should be available
+          // for query
+          LOGGER.error(s"Load to SI table to $indexTableName is failed " +
+            s"or SI table ENABLE is failed. ", ex)
+          Seq.empty
+      } finally {
+        segmentLocks.foreach {
+          segmentLock => segmentLock.unlock()
+        }
+      }
+    }
+    Seq.empty
+  }
+
+  def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
+    loadName: String): Boolean = {
+    // in concurrent scenarios there can be cases when loadName is not present in the
+    // mainTableDetails array. Added a check to see if the loadName is even present in the
+    // mainTableDetails.
+    val mainTableLoadDetail = mainTableDetails
+      .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
+    if (mainTableLoadDetail.length == 0) {
+      false
+    } else {
+      if (mainTableLoadDetail.head.getSegmentStatus ==
+        SegmentStatus.MARKED_FOR_DELETE ||
+        mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
+        false
+      } else {
+        true
+      }
+    }
+  }
+
+  def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, indexTable: CarbonTable,
+    failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
+    failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
+      carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
+        if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
+          val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
+            CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
+          if (FileFactory.isFileExist(segmentFilePath)) {
+            // delete the file if it exists
+            FileFactory.deleteFile(segmentFilePath)
+          }
+        }
+      })
+    })
+  }
+
 }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 72bb5bc..7b747d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRe
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.command._
 import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
-import org.apache.spark.sql.execution.command.index.{CarbonCreateIndexCommand, CarbonRefreshIndexCommand, DropIndexCommand, ShowIndexesCommand}
+import org.apache.spark.sql.execution.command.index.{CarbonCreateIndexCommand, CarbonRefreshIndexCommand, DropIndexCommand, IndexRepairCommand, ShowIndexesCommand}
 import org.apache.spark.sql.execution.command.management._
 import org.apache.spark.sql.execution.command.schema.CarbonAlterTableDropColumnCommand
 import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
@@ -97,7 +97,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
     createMV | dropMV | showMV | refreshMV
 
   protected lazy val indexCommands: Parser[LogicalPlan] =
-    createIndex | dropIndex | showIndexes | registerIndexes | refreshIndex
+    createIndex | dropIndex | showIndexes | registerIndexes | refreshIndex | repairIndex |
+      repairIndexDatabase
 
   protected lazy val alterTable: Parser[LogicalPlan] =
     ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
@@ -668,6 +669,34 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
         ShowIndexesCommand(table.database, table.table)
     }
 
+  /**
+   * REINDEX INDEX TABLE index_name
+   * ON [db_name.]table_name
+   * [WHERE SEGMENT.ID IN (segment_id, ...)] or
+   *
+   * REINDEX ON [db_name.]table_name
+   * [WHERE SEGMENT.ID IN (segment_id, ...)]
+   */
+  protected lazy val repairIndex: Parser[LogicalPlan] =
+    REINDEX ~> opt(INDEX ~> TABLE ~> ident)  ~ ontable ~
+      (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+    opt(";") ^^ {
+      case indexName ~ tableIdent ~ segments =>
+        IndexRepairCommand(indexName, tableIdent, null, segments)
+    }
+
+  /**
+   * REINDEX DATABASEON db_name
+   * [WHERE SEGMENT.ID IN (segment_id, ...)]
+   */
+  protected lazy val repairIndexDatabase: Parser[LogicalPlan] =
+    REINDEX ~> DATABASE ~> ident ~
+      (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+      opt(";") ^^ {
+      case dbName ~ segments =>
+        IndexRepairCommand(None, null, dbName, segments)
+    }
+
   protected lazy val registerIndexes: Parser[LogicalPlan] =
     REGISTER ~> INDEX ~> TABLE ~> ident ~ ontable <~ opt(";") ^^ {
       case indexTable ~ table =>
@@ -676,7 +705,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
 
   /**
    * REFRESH INDEX index_name
-   * ON [db_name.]table_ame
+   * ON [db_name.]table_name
    * [WHERE SEGMENT.ID IN (segment_id, ...)]
    */
   protected lazy val refreshIndex: Parser[LogicalPlan] =
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 89d60f6..cd7294b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -17,31 +17,20 @@
 
 package org.apache.spark.sql.secondaryindex.events
 
-import java.util
-
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 
 import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.command.IndexModel
-import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.events.{Event, OperationContext, OperationEventListener}
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 
 /**
  * This Listener is to load the data to failed segments of Secondary index table(s)
@@ -57,228 +46,33 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
     event match {
       case postStatusUpdateEvent: LoadTablePostStatusUpdateEvent =>
         LOGGER.info("Load post status update event-listener called")
-        val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
-        val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
-        val sparkSession = SparkSession.getActiveSession.get
-        // when Si creation and load to main table are parallel, get the carbonTable from the
-        // metastore which will have the latest index Info
-        val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
-        val carbonTable = metaStore
-          .lookupRelation(Some(carbonLoadModel.getDatabaseName),
-            carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
-        val indexMetadata = carbonTable.getIndexMetadata
-        val secondaryIndexProvider = IndexType.SI.getIndexProviderName
-        if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+          val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
+          val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
+          val sparkSession = SparkSession.getActiveSession.get
+          // when Si creation and load to main table are parallel, get the carbonTable from the
+          // metastore which will have the latest index Info
+          val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+          val carbonTable = metaStore
+            .lookupRelation(Some(carbonLoadModel.getDatabaseName),
+              carbonLoadModel.getTableName)(sparkSession)
+            .asInstanceOf[CarbonRelation].carbonTable
+          val indexMetadata = carbonTable.getIndexMetadata
+          val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+          if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
             null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
-          val indexTables = indexMetadata.getIndexesMap
-            .get(secondaryIndexProvider).keySet().asScala
-          // if there are no index tables for a given fact table do not perform any action
-          if (indexTables.nonEmpty) {
-            val mainTableDetails =
-              SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-            indexTables.foreach {
-              indexTableName =>
-                val isLoadSIForFailedSegments = sparkSession.sessionState.catalog
-                  .getTableMetadata(TableIdentifier(indexTableName,
-                    Some(carbonLoadModel.getDatabaseName))).storage.properties
-                  .getOrElse("isSITableEnabled", "true").toBoolean
-                val indexTable = metaStore
-                  .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
-                    sparkSession)
-                  .asInstanceOf[CarbonRelation]
-                  .carbonTable
-
-                val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-                  SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-                val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-                  SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-                var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
-                if (!isLoadSIForFailedSegments
-                    || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
-                  mainTblLoadMetadataDetails,
-                  siTblLoadMetadataDetails)) {
-                  val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
-                    indexTableName)
-                  val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
-                    indexMetadata.getParentTableName,
-                    indexColumns.split(",").toList,
-                    indexTableName)
-
-                  var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-                  // If it empty, then no need to do further computations because the
-                  // tablet status might not have been created and hence next load will take care
-                  if (details.isEmpty) {
-                    return
-                  }
-
-                  val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
-                  .ArrayList[LoadMetadataDetails]()
-
-                  // read the details of SI table and get all the failed segments during SI
-                  // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
-                  details.collect {
-                    case loadMetaDetail: LoadMetadataDetails =>
-                      if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
-                          checkIfMainTableLoadIsValid(mainTableDetails,
-                            loadMetaDetail.getLoadName)) {
-                          failedLoadMetadataDetails.add(loadMetaDetail)
-                      } else if ((loadMetaDetail.getSegmentStatus ==
-                                  SegmentStatus.INSERT_IN_PROGRESS ||
-                                  loadMetaDetail.getSegmentStatus ==
-                                  SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
-                                 checkIfMainTableLoadIsValid(mainTableDetails,
-                                   loadMetaDetail.getLoadName)) {
-                        val segmentLock = CarbonLockFactory
-                          .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
-                            CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
-                            LockUsage.LOCK)
-                        try {
-                          if (segmentLock.lockWithRetries(1, 0)) {
-                            LOGGER
-                              .info("SIFailedLoadListener: Acquired segment lock on segment:" +
-                                    loadMetaDetail.getLoadName)
-                            failedLoadMetadataDetails.add(loadMetaDetail)
-                          }
-                        } finally {
-                          segmentLock.unlock()
-                          LOGGER
-                            .info("SIFailedLoadListener: Released segment lock on segment:" +
-                                  loadMetaDetail.getLoadName)
-                        }
-                      }
-                  }
-                  // check for the skipped segments. compare the main table and SI table table
-                  // status file and get the skipped segments if any
-                  CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
-                    .foreach(metadataDetail => {
-                      val detail = details
-                        .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-                      val mainTableDetail = mainTableDetails
-                        .filter(metadata => metadata.getLoadName.equals(metadataDetail))
-                      if (null == detail || detail.length == 0) {
-                        val newDetails = new LoadMetadataDetails
-                        newDetails.setLoadName(metadataDetail)
-                        LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName)
-                        failedLoadMetadataDetails.add(newDetails)
-                      } else if (detail != null && detail.length !=0 && metadataDetail != null
-                                 && metadataDetail.length != 0) {
-                        // If SI table has compacted segments and main table does not have
-                        // compacted segments due to some failure while compaction, need to
-                        // reload the original segments in this case.
-                        if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
-                            mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
-                          detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
-                          // in concurrent scenario, if a compaction is going on table, then SI
-                          // segments are updated first in table status and then the main table
-                          // segment, so in any load runs parallel this listener shouldn't consider
-                          // those segments accidentally. So try to take the segment lock.
-                          val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
-                            .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
-                              CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
-                              LockUsage.LOCK)
-                          if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
-                            segmentLocks += segmentLockOfProbableOnCompactionSeg
-                            LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
-                            failedLoadMetadataDetails.add(detail(0))
-                          }
-                        }
-                      }
-                    })
-                  try {
-                    if (!failedLoadMetadataDetails.isEmpty) {
-                      // in the case when in SI table a segment is deleted and it's entry is
-                      // deleted from the tablestatus file, the corresponding .segment file from
-                      // the metadata folder should also be deleted as it contains the
-                      // merge file name which does not exist anymore as the segment is deleted.
-                      deleteStaleSegmentFileIfPresent(carbonLoadModel,
-                        indexTable,
-                        failedLoadMetadataDetails)
-                      CarbonIndexUtil
-                        .LoadToSITable(sparkSession,
-                          carbonLoadModel,
-                          indexTableName,
-                          isLoadToFailedSISegments = true,
-                          secondaryIndex,
-                          carbonTable, indexTable, failedLoadMetadataDetails)
-
-                      // get the current load metadata details of the index table
-                      details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-                    }
-
-                    // get updated main table segments and si table segments
-                    val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-                      SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-                    val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
-                      SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-
-                    // check if main table has load in progress and SI table has no load
-                    // in progress entry, then no need to enable the SI table
-                    // Only if the valid segments of main table match the valid segments of SI
-                    // table then we can enable the SI for query
-                    if (CarbonInternalLoaderUtil
-                          .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
-                            siTblLoadMetadataDetails)
-                        && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
-                      mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
-                      // enable the SI table if it was disabled earlier due to failure during SI
-                      // creation time
-                      sparkSession.sql(
-                        s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
-                           |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
-                    }
-                  } catch {
-                    case ex: Exception =>
-                      // in case of SI load only for for failed segments, catch the exception, but
-                      // do not fail the main table load, as main table segments should be available
-                      // for query
-                      LOGGER.error(s"Load to SI table to $indexTableName is failed " +
-                               s"or SI table ENABLE is failed. ", ex)
-                      return
-                  } finally {
-                    segmentLocks.foreach {
-                      segmentLock => segmentLock.unlock()
-                    }
-                  }
-                }
+            val indexTables = indexMetadata.getIndexesMap
+              .get(secondaryIndexProvider).keySet().asScala
+            // if there are no index tables for a given fact table do not perform any action
+            if (indexTables.nonEmpty) {
+              val mainTableDetails =
+                SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+              indexTables.foreach {
+                indexTableName =>
+                  CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                    indexMetadata, mainTableDetails.toList, secondaryIndexProvider)(sparkSession)
+              }
             }
           }
-        }
-    }
-  }
-
-  def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
-    loadName: String): Boolean = {
-    // in concurrent scenarios there can be cases when loadName is not present in the
-    // mainTableDetails array. Added a check to see if the loadName is even present in the
-    // mainTableDetails.
-    val mainTableLoadDetail = mainTableDetails
-      .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
-    if (mainTableLoadDetail.length == 0) {
-      false
-    } else {
-      if (mainTableLoadDetail.head.getSegmentStatus ==
-        SegmentStatus.MARKED_FOR_DELETE ||
-        mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
-        false
-      } else {
-        true
-      }
     }
   }
-
-  def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, indexTable: CarbonTable,
-    failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
-    failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
-      carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
-        if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
-          val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
-            CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
-          if (FileFactory.isFileExist(segmentFilePath)) {
-            // delete the file if it exists
-            FileFactory.deleteFile(segmentFilePath)
-          }
-        }
-      })
-    })
-  }
 }