You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@carbondata.apache.org by GitBox <gi...@apache.org> on 2020/08/20 15:38:01 UTC

[GitHub] [carbondata] akashrn5 commented on a change in pull request #3873: [CARBONDATA-3956] Reindex command on SI table

akashrn5 commented on a change in pull request #3873:
URL: https://github.com/apache/carbondata/pull/3873#discussion_r474030501



##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing create index table
+ */
+class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists maintable")
+    sql("drop table if exists indextable1")
+    sql("drop table if exists indextable2")
+  }
+
+  test("reindex command after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)

Review comment:
       you should also consider, adding a query and check if its hitting SI after reindex. Please check for all test cases

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)

Review comment:
       why we need both the variable,` indexname.isEmpty, indexname`, you can just sent indexName and then check whether defined or not and tale decisions in the function implementation.

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level

Review comment:
       comment is not clear

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)

Review comment:
       ```suggestion
         triggerIndexRepair(tableNameOp.table, databaseName, indexname.isDefined, indexname, segments)
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager

Review comment:
       before reading the table status file, get a lock and read to avoid concurrent issues

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,

Review comment:
       move all the variables to next line, correct the formatting. Refer other case classes.

##########
File path: docs/index/secondary-index-guide.md
##########
@@ -188,4 +188,13 @@ where we have old stores.
 Syntax
   ```
   REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
+  ```
+
+### Repair index Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments in main table and the SI table

Review comment:
       ```suggestion
   of segments with main table.
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table

Review comment:
       comment is not proper, please correct it according to functionality

##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing create index table

Review comment:
       test cases for testing repair index table right?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       rename this to proper name

##########
File path: index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
##########
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing create index table
+ */
+class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("drop table if exists maintable")
+    sql("drop table if exists indextable1")
+    sql("drop table if exists indextable2")
+  }
+
+  test("reindex command after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)
+    sql("drop table if exists maintable")
+  }
+
+
+  test("reindex command after deleting segments from SI table on other database without use") {
+    sql("drop table if exists test.maintable")
+    sql("drop database if exists test cascade")
+    sql("create database test")
+    sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+    sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE")
+    val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+    assert(preDeleteSegments == postRepairSegments)
+    sql("drop table if exists test.maintable")
+    sql("drop database if exists test cascade")
+  }
+
+  test("reindex command using segment.id after deleting segments from SI table") {
+    sql("drop table if exists maintable")
+    sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+    sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+    sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+
+    val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+    sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+    val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(preDeleteSegments!=postDeleteSegments)
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (0,1)")
+    val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+    assert(postDeleteSegments + 2 == postFirstRepair)
+    sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)")

Review comment:
       add a query before reindex and it should not hit SI table

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,

Review comment:
       ```suggestion
   case class IndexRepairCommand(indexNameOp: Option[String], tableIdentifier: TableIdentifier,
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{

Review comment:
       ```suggestion
                                 segments: Option[List[String]]) extends DataCommand {
   ```
   
   correct the formatting

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {

Review comment:
       `tableNameOp.database` gives an `Option ` right, you can use `isDefined`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString

Review comment:
       its a redundant operation to do `toString` on an already string

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db

Review comment:
       rewrite the comment in more meaningful way

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }
+        if (allIndex) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,

Review comment:
       ```suggestion
                 CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }
+        if (allIndex) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          var isPresent : Boolean = false

Review comment:
       why this boolean required?

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {

Review comment:
       ```suggestion
           val mainTableDetails = if (segments.isDefined) {
   ```

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }
+        if (allIndex) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          var isPresent : Boolean = false
+          indexTables.foreach {
+            indexTableName =>
+              if (indexTableName == indexName.get) {

Review comment:
       you can rename `indexName` to more meaningful like, `indexTableToRepair`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }
+        if (allIndex) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+          }
+        } else {
+          var isPresent : Boolean = false
+          indexTables.foreach {
+            indexTableName =>
+              if (indexTableName == indexName.get) {
+                isPresent = true
+                CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                  indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+              }
+          }

Review comment:
       ```
   indexTables.foreach {
               indexTableName =>
                 if (indexTableName == indexName.get) {
                   isPresent = true
                   CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
                     indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
                 }
             }
   ```
   
   remove this code, use in a simple way
   
   `indexTables.filter(indexTable => indexTable.equals(indexName.get))` theh pass the output to the processRepair method

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }
+        if (allIndex) {
+          indexTables.foreach {
+            indexTableName =>
+              CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+                indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)

Review comment:
       no need to send `secondaryIndexProvider`

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()

Review comment:
       remove this temp list, use scala itself

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)

Review comment:
       we are already reading the table status file at line number 80, no need to read again and again, avoid reading at line 92 and 95

##########
File path: integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Show indexes on the table
+ */
+case class IndexRepairCommand(indexname: Option[String], tableNameOp: TableIdentifier,
+                              dbName: String,
+                              segments: Option[List[String]]) extends DataCommand{
+
+  private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+  def processData(sparkSession: SparkSession): Seq[Row] = {
+    if (dbName == null) {
+      // table level and index level
+      val databaseName = if (tableNameOp.database.isEmpty) {
+        SparkSession.getActiveSession.get.catalog.currentDatabase
+      } else {
+        tableNameOp.database.get.toString
+      }
+      triggerRepair(tableNameOp.table, databaseName, indexname.isEmpty, indexname, segments)
+    } else {
+      // for all tables in the db
+        sparkSession.sessionState.catalog.listTables(dbName).foreach {
+          tableIdent =>
+            triggerRepair(tableIdent.table, dbName, indexname.isEmpty, indexname, segments)
+        }
+    }
+    Seq.empty
+  }
+
+  def triggerRepair(tableNameOp: String, databaseName: String, allIndex: Boolean,
+                    indexName: Option[String], segments: Option[List[String]]): Unit = {
+    val sparkSession = SparkSession.getActiveSession.get
+    // when Si creation and load to main table are parallel, get the carbonTable from the
+    // metastore which will have the latest index Info
+    val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+    val carbonTable = metaStore
+      .lookupRelation(Some(databaseName), tableNameOp)(sparkSession)
+      .asInstanceOf[CarbonRelation].carbonTable
+
+    val carbonLoadModel = new CarbonLoadModel
+    carbonLoadModel.setDatabaseName(databaseName)
+    carbonLoadModel.setTableName(tableNameOp)
+    carbonLoadModel.setTablePath(carbonTable.getTablePath)
+    val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath)
+    carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+      .readTableStatusFile(tableStatusFilePath).toList.asJava)
+    carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(carbonTable))
+
+    val indexMetadata = carbonTable.getIndexMetadata
+    val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+    if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+      null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+      val indexTables = indexMetadata.getIndexesMap
+        .get(secondaryIndexProvider).keySet().asScala
+      // if there are no index tables for a given fact table do not perform any action
+      if (indexTables.nonEmpty) {
+        val mainTableDetails = if (segments.isEmpty) {
+          SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+        } else {
+          // get segments for main table
+          val tempLoadMetaData = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+          val listLoadMetaDataDetails = new util.ArrayList[LoadMetadataDetails]()
+          segments.get.foreach(segmentNo => {
+            tempLoadMetaData.foreach(metadata =>
+              if (metadata.getLoadName.equals(segmentNo)) {
+                listLoadMetaDataDetails.add(metadata)
+              })
+          })
+          listLoadMetaDataDetails.asScala.toArray
+        }

Review comment:
       ```
         segments.get.foreach(segmentNo => {
               tempLoadMetaData.foreach(metadata =>
                 if (metadata.getLoadName.equals(segmentNo)) {
                   listLoadMetaDataDetails.add(metadata)
                 })
             })
             listLoadMetaDataDetails.asScala.toArray
   ```
   
   remove this code, use scala filter like below
   
   `tempLoadMetadata.filter(loadMetadataDetails => segments.get.contains(loadMetadatDetails.getLoadName))`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org