You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/01 11:39:09 UTC
[carbondata] branch master updated: [CARBONDATA-3956] Reindex
command on SI table
This is an automated email from the ASF dual-hosted git repository.
kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 0afbef1 [CARBONDATA-3956] Reindex command on SI table
0afbef1 is described below
commit 0afbef198d9142acec153f7da7f613ce7bfc25db
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Wed Jul 29 18:01:04 2020 +0530
[CARBONDATA-3956] Reindex command on SI table
Why is this PR needed?
In the main table with SI tables after every
load/insert command , SILoadEventListenerForFailedSegments.scala
checks for missing segments or segments mismatch in SI table
and loads the missing/deleted segments to the SI table.
This functionality is dependent on the load/insert command.
A new functionality/command is required so that this repair
logic can be made independent to the load/insert logic.
What changes were proposed in this PR?
Added a separate SQL reindex command(reindex [index_table]
on table maintable) to call the SI repair logic without load/insert.
This closes #3873
---
docs/index/secondary-index-guide.md | 23 +-
.../testsuite/secondaryindex/TestIndexRepair.scala | 253 ++++++++++++++++++++
.../spark/sql/catalyst/CarbonDDLSqlParser.scala | 1 +
.../command/index/IndexRepairCommand.scala | 134 +++++++++++
.../apache/spark/sql/index/CarbonIndexUtil.scala | 214 ++++++++++++++++-
.../spark/sql/parser/CarbonSpark2SqlParser.scala | 35 ++-
.../SILoadEventListenerForFailedSegments.scala | 258 +++------------------
7 files changed, 679 insertions(+), 239 deletions(-)
diff --git a/docs/index/secondary-index-guide.md b/docs/index/secondary-index-guide.md
index cf22097..2eaaed9 100644
--- a/docs/index/secondary-index-guide.md
+++ b/docs/index/secondary-index-guide.md
@@ -188,4 +188,25 @@ where we have old stores.
Syntax
```
REGISTER INDEX TABLE index_name ON [TABLE] [db_name.]table_name
- ```
\ No newline at end of file
+ ```
+
+### Reindex Command
+This command is used to reload segments in the SI table in case when there is some mismatch in the number
+of segments with main table.
+
+Syntax
+
+Reindex on all the secondary Indexes of the main table
+ ```
+ REINDEX ON TABLE [db_name.]main_table_name [WHERE SEGMENT.ID IN(0,1)]
+ ```
+Reindexing at index table level
+
+ ```
+ REINDEX INDEX TABLE index_table ON [db_name.]main_table_name [WHERE SEGMENT.ID IN (1)]
+ ```
+Reindex on Database level
+ ```
+ REINDEX DATABASE db_name [WHERE SEGMENT.ID IN (1,2,5)]
+ ```
+Note: This command is not supported with other concurrent operations.
\ No newline at end of file
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
new file mode 100644
index 0000000..1a041ad
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestIndexRepair.scala
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex
+
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.spark.testsuite.secondaryindex.TestSecondaryIndexUtils
+.isFilterPushedDownToSI;
+import org.apache.spark.sql.test.util.QueryTest
+
+/**
+ * test cases for testing reindex command on index table/main table/DB level
+ */
+class TestIndexRepair extends QueryTest with BeforeAndAfterAll {
+
+ override def beforeAll {
+ sql("drop table if exists maintable")
+ sql("drop table if exists indextable1")
+ sql("drop table if exists indextable2")
+ }
+
+ test("reindex command after deleting segments from SI table") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql(s"""ALTER TABLE default.indextable1 SET
+ |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+ val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(!isFilterPushedDownToSI(df1))
+ val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments!=postDeleteSegments)
+ sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE")
+ val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments == postRepairSegments)
+ assert(isFilterPushedDownToSI(df2))
+ sql("drop table if exists maintable")
+ }
+
+
+ test("reindex command after deleting segments from SI table on other database without use") {
+ sql("drop table if exists test.maintable")
+ sql("drop database if exists test cascade")
+ sql("create database test")
+ sql("CREATE TABLE test.maintable(a INT, b STRING, c STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table test.maintable(c) as 'carbondata'")
+ sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO test.maintable SELECT 1,'string1', 'string2'")
+
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+ sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+ sql(s"""ALTER TABLE test.indextable1 SET
+ |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+ val df1 = sql("select * from test.maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(!isFilterPushedDownToSI(df1))
+ val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+ assert(preDeleteSegments!=postDeleteSegments)
+ sql("REINDEX INDEX TABLE indextable1 ON test.MAINTABLE")
+ val df2 = sql("select * from test.maintable where c = 'string2'").queryExecution.sparkPlan
+ val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+ assert(preDeleteSegments == postRepairSegments)
+ assert(isFilterPushedDownToSI(df2))
+ sql("drop table if exists test.maintable")
+ sql("drop database if exists test cascade")
+ }
+
+ test("reindex command using segment.id after deleting segments from SI table") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0,1,2)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments!=postDeleteSegments)
+ sql(s"""ALTER TABLE default.indextable1 SET
+ |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+ val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(!isFilterPushedDownToSI(df1))
+ sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (0,1)")
+ val postFirstRepair = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(postDeleteSegments + 2 == postFirstRepair)
+ val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(!isFilterPushedDownToSI(df2))
+ sql("REINDEX INDEX TABLE indextable1 ON MAINTABLE WHERE SEGMENT.ID IN (2)")
+ val postRepairSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments == postRepairSegments)
+ val df3 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df3))
+ sql("drop table if exists maintable")
+ }
+
+ test("insert command after deleting segments from SI table") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable(a INT, b STRING, c STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(1,2,3)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ val postDeleteSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments!=postDeleteSegments)
+ sql(s"""ALTER TABLE default.indextable1 SET
+ |SERDEPROPERTIES ('isSITableEnabled' = 'false')""".stripMargin)
+ val df1 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(!isFilterPushedDownToSI(df1))
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2'")
+ val postLoadSegments = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ assert(preDeleteSegments + 1 == postLoadSegments)
+ val df2 = sql("select * from maintable where c = 'string2'").queryExecution.sparkPlan
+ assert(isFilterPushedDownToSI(df2))
+ sql("drop table if exists maintable")
+ }
+
+ test("reindex command on main table") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+ sql("CREATE INDEX indextable2 on table maintable(d) as 'carbondata'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+ val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+ assert(preDeleteSegments!=postDeleteSegmentsIndexOne)
+ assert(preDeleteSegments!=postDeleteSegmentsIndexTwo)
+ sql("REINDEX ON TABLE MAINTABLE")
+ val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+ assert(preDeleteSegments == postRepairSegmentsIndexOne)
+ assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+ sql("drop table if exists maintable")
+ }
+
+ test("reindex command on main table with delete command") {
+ sql("drop table if exists maintable")
+ sql("CREATE TABLE maintable(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table maintable(c) as 'carbondata'")
+ sql("CREATE INDEX indextable2 on table maintable(d) as 'carbondata'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+ sql("INSERT INTO maintable SELECT 1,'string1', 'string2', 'string3'")
+ val preDeleteSegments = sql("SHOW SEGMENTS FOR TABLE MAINTABLE").count()
+ sql("DELETE FROM TABLE INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE1")
+ sql("DELETE FROM TABLE INDEXTABLE2 WHERE SEGMENT.ID IN(1)")
+ sql("CLEAN FILES FOR TABLE INDEXTABLE2")
+ val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+ assert(preDeleteSegments != postDeleteSegmentsIndexOne)
+ assert(preDeleteSegments != postDeleteSegmentsIndexTwo)
+ sql("REINDEX ON TABLE MAINTABLE WHERE SEGMENT.ID IN(0,1)")
+ val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE1").count()
+ val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE INDEXTABLE2").count()
+ assert(preDeleteSegments == postRepairSegmentsIndexOne)
+ assert(preDeleteSegments == postRepairSegmentsIndexTwo)
+ sql("drop table if exists maintable")
+ }
+
+
+ test("reindex command on database") {
+ sql("drop database if exists test cascade")
+ sql("create database test")
+ sql("drop table if exists maintable1")
+
+ //table 1
+ sql("CREATE TABLE test.maintable1(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+ sql("CREATE INDEX indextable1 on table test.maintable1(c) as 'carbondata'")
+ sql("CREATE INDEX indextable2 on table test.maintable1(d) as 'carbondata'")
+ sql("INSERT INTO test.maintable1 SELECT 1,'string1', 'string2', 'string3'")
+ sql("INSERT INTO test.maintable1 SELECT 1,'string1', 'string2', 'string3'")
+
+ val preDeleteSegmentsTableOne = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE1").count()
+ sql("DELETE FROM TABLE test.INDEXTABLE1 WHERE SEGMENT.ID IN(0)")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE1")
+ sql("DELETE FROM TABLE test.INDEXTABLE2 WHERE SEGMENT.ID IN(0,1)")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE2")
+ val postDeleteSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+ val postDeleteSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE2").count()
+
+ // table 2
+ sql("CREATE TABLE test.maintable2(a INT, b STRING, c STRING, d STRING) stored as carbondata")
+ sql("CREATE INDEX indextable3 on table test.maintable2(c) as 'carbondata'")
+ sql("CREATE INDEX indextable4 on table test.maintable2(d) as 'carbondata'")
+ sql("INSERT INTO test.maintable2 SELECT 1,'string1', 'string2', 'string3'")
+ sql("INSERT INTO test.maintable2 SELECT 1,'string1', 'string2', 'string3'")
+
+ val preDeleteSegmentsTableTwo = sql("SHOW SEGMENTS FOR TABLE test.MAINTABLE2").count()
+ sql("DELETE FROM TABLE test.INDEXTABLE3 WHERE SEGMENT.ID IN(1)")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE3")
+ sql("DELETE FROM TABLE test.INDEXTABLE4 WHERE SEGMENT.ID IN(0,1)")
+ sql("CLEAN FILES FOR TABLE test.INDEXTABLE4")
+ val postDeleteSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE3").count()
+ val postDeleteSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE4").count()
+
+ assert(preDeleteSegmentsTableOne!=postDeleteSegmentsIndexOne)
+ assert(preDeleteSegmentsTableOne!=postDeleteSegmentsIndexTwo)
+ assert(preDeleteSegmentsTableTwo!=postDeleteSegmentsIndexThree)
+ assert(preDeleteSegmentsTableTwo!=postDeleteSegmentsIndexFour)
+ sql("REINDEX DATABASE TEST")
+ val postRepairSegmentsIndexOne = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE1").count()
+ val postRepairSegmentsIndexTwo = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE2").count()
+ val postRepairSegmentsIndexThree = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE3").count()
+ val postRepairSegmentsIndexFour = sql("SHOW SEGMENTS FOR TABLE test.INDEXTABLE4").count()
+ assert(preDeleteSegmentsTableOne == postRepairSegmentsIndexOne)
+ assert(preDeleteSegmentsTableOne == postRepairSegmentsIndexTwo)
+ assert(preDeleteSegmentsTableTwo == postRepairSegmentsIndexThree)
+ assert(preDeleteSegmentsTableTwo == postRepairSegmentsIndexFour)
+ sql("drop table if exists test.maintable1")
+ sql("drop table if exists test.maintable2")
+ sql("drop database if exists test cascade")
+ }
+
+
+ override def afterAll {
+ sql("drop table if exists maintable")
+ sql("drop table if exists indextable1")
+ sql("drop table if exists indextable2")
+ }
+
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 9928f48..155e63d 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -174,6 +174,7 @@ abstract class CarbonDDLSqlParser extends AbstractCarbonSparkSQLParser {
protected val INSERT = carbonKeyWord("INSERT")
protected val STAGE = carbonKeyWord("STAGE")
protected val INDEX = carbonKeyWord("INDEX")
+ protected val REINDEX = carbonKeyWord("REINDEX")
protected val INDEXES = carbonKeyWord("INDEXES")
protected val REGISTER = carbonKeyWord("REGISTER")
protected val PROPERTIES = carbonKeyWord("PROPERTIES")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
new file mode 100644
index 0000000..6a3e4ec
--- /dev/null
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/index/IndexRepairCommand.scala
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command.index
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.execution.command.DataCommand
+import org.apache.spark.sql.hive.CarbonRelation
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage}
+import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel}
+
+/**
+ * Repair logic for reindex command on maintable/indextable
+ */
+case class IndexRepairCommand(
+ indexnameOp: Option[String], tableIdentifier: TableIdentifier,
+ dbName: String,
+ segments: Option[List[String]])
+extends DataCommand {
+
+ private val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+
+ def processData(sparkSession: SparkSession): Seq[Row] = {
+ if (dbName == null) {
+ // dbName is null, repair for index table or all the index table in main table
+ val databaseName = if (tableIdentifier.database.isEmpty) {
+ sparkSession.sessionState.catalog.getCurrentDatabase
+ } else {
+ tableIdentifier.database.get
+ }
+ triggerRepair(tableIdentifier.table, databaseName, indexnameOp, segments, sparkSession)
+ } else {
+ // repairing si for all index tables in the mentioned database in the repair command
+ sparkSession.sessionState.catalog.listTables(dbName).foreach {
+ tableIdent =>
+ triggerRepair(tableIdent.table, dbName, indexnameOp, segments, sparkSession)
+ }
+ }
+ Seq.empty
+ }
+
+ def triggerRepair(tableName: String, databaseName: String,
+ indexTableToRepair: Option[String], segments: Option[List[String]],
+ sparkSession: SparkSession): Unit = {
+ // when Si creation and load to main table are parallel, get the carbonTable from the
+ // metastore which will have the latest index Info
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+ val mainCarbonTable = metaStore
+ .lookupRelation(Some(databaseName), tableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable
+
+ val tableStatusLock = CarbonLockFactory
+ .getCarbonLockObj(mainCarbonTable.getAbsoluteTableIdentifier, LockUsage.TABLE_STATUS_LOCK)
+ val carbonLoadModel = new CarbonLoadModel
+ carbonLoadModel.setDatabaseName(databaseName)
+ carbonLoadModel.setTableName(tableName)
+ carbonLoadModel.setTablePath(mainCarbonTable.getTablePath)
+ try {
+ if (tableStatusLock.lockWithRetries()) {
+ val tableStatusFilePath = CarbonTablePath
+ .getTableStatusFilePath(mainCarbonTable.getTablePath)
+ carbonLoadModel.setLoadMetadataDetails(SegmentStatusManager
+ .readTableStatusFile(tableStatusFilePath).toList.asJava)
+ carbonLoadModel.setCarbonDataLoadSchema(new CarbonDataLoadSchema(mainCarbonTable))
+ } else {
+ throw new ConcurrentOperationException(mainCarbonTable.getDatabaseName,
+ mainCarbonTable.getTableName, "table status read", "reindex command")
+ }
+ } finally {
+ tableStatusLock.unlock()
+ }
+ val indexMetadata = mainCarbonTable.getIndexMetadata
+ val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+ if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+ null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
+ val indexTables = indexMetadata.getIndexesMap
+ .get(secondaryIndexProvider).keySet().asScala
+ // if there are no index tables for a given fact table do not perform any action
+ if (indexTables.nonEmpty) {
+ val mainTableDetails = if (segments.isEmpty) {
+ carbonLoadModel.getLoadMetadataDetails.asScala.toList
+ } else {
+ // get segments for main table
+ carbonLoadModel.getLoadMetadataDetails.asScala.toList.filter(
+ loadMetaDataDetails => segments.get.contains(loadMetaDataDetails.getLoadName))
+ }
+ if (indexTableToRepair.isEmpty) {
+ indexTables.foreach {
+ indexTableName =>
+ CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+ indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+ }
+ } else {
+ val indexTablesToRepair = indexTables.filter(indexTable => indexTable
+ .equals(indexTableToRepair.get))
+ indexTablesToRepair.foreach {
+ indexTableName =>
+ CarbonIndexUtil.processSIRepair(indexTableName, mainCarbonTable, carbonLoadModel,
+ indexMetadata, mainTableDetails, secondaryIndexProvider)(sparkSession)
+ }
+ if (indexTablesToRepair.isEmpty) {
+ throw new Exception("Unable to find index table" + indexTableToRepair.get)
+ }
+ }
+ }
+ }
+ }
+
+ override protected def opName: String = "REINDEX COMMAND"
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index 9bb7eb6..c2bb9cf 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -21,11 +21,12 @@ import java.util
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.CarbonSessionCatalogUtil
+import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
@@ -35,13 +36,16 @@ import org.apache.spark.util.AlterTableUtil
import org.apache.carbondata.common.logging.LogServiceFactory
import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.datastore.compression.CompressorFactory
-import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.converter.ThriftWrapperSchemaConverterImpl
import org.apache.carbondata.core.metadata.index.IndexType
+import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
import org.apache.carbondata.core.metadata.schema.indextable.IndexTableInfo
import org.apache.carbondata.core.metadata.schema.table.CarbonTable
import org.apache.carbondata.core.mutate.CarbonUpdateUtil
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.carbondata.format.TableInfo
import org.apache.carbondata.processing.loading.model.CarbonLoadModel
@@ -376,4 +380,208 @@ object CarbonIndexUtil {
AlterTableUtil.releaseLocks(locks.asScala.toList)
}
}
+
+ def processSIRepair(indexTableName: String, carbonTable: CarbonTable,
+ carbonLoadModel: CarbonLoadModel, indexMetadata: IndexMetadata,
+ mainTableDetails: List[LoadMetadataDetails], secondaryIndexProvider: String)
+ (sparkSession: SparkSession) : Unit = {
+ // when Si creation and load to main table are parallel, get the carbonTable from the
+ // metastore which will have the latest index Info
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+ val indexTable = metaStore
+ .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
+ sparkSession)
+ .asInstanceOf[CarbonRelation]
+ .carbonTable
+
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
+ if (!CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
+ mainTableDetails.toArray,
+ siTblLoadMetadataDetails)) {
+ val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
+ indexTableName)
+ val indexModel = IndexModel(Some(carbonTable.getDatabaseName),
+ indexMetadata.getParentTableName,
+ indexColumns.split(",").toList,
+ indexTableName)
+
+ // var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ // If it empty, then no need to do further computations because the
+ // tabletstatus might not have been created and hence next load will take care
+ if (siTblLoadMetadataDetails.isEmpty) {
+ Seq.empty
+ }
+
+ val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
+ .ArrayList[LoadMetadataDetails]()
+
+ // read the details of SI table and get all the failed segments during SI
+ // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
+ siTblLoadMetadataDetails.foreach {
+ case loadMetaDetail: LoadMetadataDetails =>
+ if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
+ checkIfMainTableLoadIsValid(mainTableDetails.toArray,
+ loadMetaDetail.getLoadName)) {
+ failedLoadMetadataDetails.add(loadMetaDetail)
+ } else if ((loadMetaDetail.getSegmentStatus ==
+ SegmentStatus.INSERT_IN_PROGRESS ||
+ loadMetaDetail.getSegmentStatus ==
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
+ checkIfMainTableLoadIsValid(mainTableDetails.toArray,
+ loadMetaDetail.getLoadName)) {
+ val segmentLock = CarbonLockFactory
+ .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
+ LockUsage.LOCK)
+ try {
+ if (segmentLock.lockWithRetries(1, 0)) {
+ LOGGER
+ .info("SIFailedLoadListener: Acquired segment lock on segment:" +
+ loadMetaDetail.getLoadName)
+ failedLoadMetadataDetails.add(loadMetaDetail)
+ }
+ } finally {
+ segmentLock.unlock()
+ LOGGER
+ .info("SIFailedLoadListener: Released segment lock on segment:" +
+ loadMetaDetail.getLoadName)
+ }
+ }
+ }
+ // check for the skipped segments. compare the main table and SI table table
+ // status file and get the skipped segments if any
+ CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails.toArray).asScala
+ .foreach(metadataDetail => {
+ val detail = siTblLoadMetadataDetails
+ .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+ val mainTableDetail = mainTableDetails
+ .filter(metadata => metadata.getLoadName.equals(metadataDetail))
+ if (null == detail || detail.length == 0) {
+ val newDetails = new LoadMetadataDetails
+ newDetails.setLoadName(metadataDetail)
+ LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName + " for SI" +
+ " table " + indexTableName + "." + carbonTable.getTableName )
+ failedLoadMetadataDetails.add(newDetails)
+ } else if (detail != null && detail.length != 0 && metadataDetail != null
+ && metadataDetail.length != 0) {
+ // If SI table has compacted segments and main table does not have
+ // compacted segments due to some failure while compaction, need to
+ // reload the original segments in this case.
+ if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
+ mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
+ detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
+ // in concurrent scenario, if a compaction is going on table, then SI
+ // segments are updated first in table status and then the main table
+ // segment, so in any load runs parallel this listener shouldn't consider
+ // those segments accidentally. So try to take the segment lock.
+ val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
+ .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
+ CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
+ LockUsage.LOCK)
+ if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
+ segmentLocks += segmentLockOfProbableOnCompactionSeg
+ LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName + " for SI " +
+ "table " + indexTableName + "." + carbonTable.getTableName )
+ failedLoadMetadataDetails.add(detail(0))
+ }
+ }
+ }
+ })
+ try {
+ if (!failedLoadMetadataDetails.isEmpty) {
+ // in the case when in SI table a segment is deleted and it's entry is
+ // deleted from the tablestatus file, the corresponding .segment file from
+ // the metadata folder should also be deleted as it contains the
+ // mergefilename which does not exist anymore as the segment is deleted.
+ deleteStaleSegmentFileIfPresent(carbonLoadModel,
+ indexTable,
+ failedLoadMetadataDetails)
+ CarbonIndexUtil
+ .LoadToSITable(sparkSession,
+ carbonLoadModel,
+ indexTableName,
+ isLoadToFailedSISegments = true,
+ indexModel,
+ carbonTable, indexTable, failedLoadMetadataDetails)
+
+ // get the current load metadata details of the index table
+ // details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+ }
+
+ // get updated main table segments and si table segments
+ val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
+ SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
+
+ // check if main table has load in progress and SI table has no load
+ // in progress entry, then no need to enable the SI table
+ // Only if the valid segments of maintable match the valid segments of SI
+ // table then we can enable the SI for query
+ if (CarbonInternalLoaderUtil
+ .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
+ siTblLoadMetadataDetails)
+ && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
+ mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
+ // enable the SI table if it was disabled earlier due to failure during SI
+ // creation time
+ sparkSession.sql(
+ s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
+ |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
+ }
+ } catch {
+ case ex: Exception =>
+ // in case of SI load only for for failed segments, catch the exception, but
+ // do not fail the main table load, as main table segments should be available
+ // for query
+ LOGGER.error(s"Load to SI table to $indexTableName is failed " +
+ s"or SI table ENABLE is failed. ", ex)
+ Seq.empty
+ } finally {
+ segmentLocks.foreach {
+ segmentLock => segmentLock.unlock()
+ }
+ }
+ }
+ Seq.empty
+ }
+
+ def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
+ loadName: String): Boolean = {
+ // in concurrent scenarios there can be cases when loadName is not present in the
+ // mainTableDetails array. Added a check to see if the loadName is even present in the
+ // mainTableDetails.
+ val mainTableLoadDetail = mainTableDetails
+ .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
+ if (mainTableLoadDetail.length == 0) {
+ false
+ } else {
+ if (mainTableLoadDetail.head.getSegmentStatus ==
+ SegmentStatus.MARKED_FOR_DELETE ||
+ mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
+ false
+ } else {
+ true
+ }
+ }
+ }
+
+ def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, indexTable: CarbonTable,
+ failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
+ failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
+ carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
+ if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
+ val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
+ CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
+ if (FileFactory.isFileExist(segmentFilePath)) {
+ // delete the file if it exists
+ FileFactory.deleteFile(segmentFilePath)
+ }
+ }
+ })
+ })
+ }
+
}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
index 72bb5bc..7b747d8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/parser/CarbonSpark2SqlParser.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, UnresolvedRe
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.execution.command._
import org.apache.spark.sql.execution.command.cache.{CarbonDropCacheCommand, CarbonShowCacheCommand}
-import org.apache.spark.sql.execution.command.index.{CarbonCreateIndexCommand, CarbonRefreshIndexCommand, DropIndexCommand, ShowIndexesCommand}
+import org.apache.spark.sql.execution.command.index.{CarbonCreateIndexCommand, CarbonRefreshIndexCommand, DropIndexCommand, IndexRepairCommand, ShowIndexesCommand}
import org.apache.spark.sql.execution.command.management._
import org.apache.spark.sql.execution.command.schema.CarbonAlterTableDropColumnCommand
import org.apache.spark.sql.execution.command.stream.{CarbonCreateStreamCommand, CarbonDropStreamCommand, CarbonShowStreamsCommand}
@@ -97,7 +97,8 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
createMV | dropMV | showMV | refreshMV
protected lazy val indexCommands: Parser[LogicalPlan] =
- createIndex | dropIndex | showIndexes | registerIndexes | refreshIndex
+ createIndex | dropIndex | showIndexes | registerIndexes | refreshIndex | repairIndex |
+ repairIndexDatabase
protected lazy val alterTable: Parser[LogicalPlan] =
ALTER ~> TABLE ~> (ident <~ ".").? ~ ident ~ (COMPACT ~ stringLit) ~
@@ -668,6 +669,34 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
ShowIndexesCommand(table.database, table.table)
}
+ /**
+ * REINDEX INDEX TABLE index_name
+ * ON [db_name.]table_name
+ * [WHERE SEGMENT.ID IN (segment_id, ...)] or
+ *
+ * REINDEX ON [db_name.]table_name
+ * [WHERE SEGMENT.ID IN (segment_id, ...)]
+ */
+ protected lazy val repairIndex: Parser[LogicalPlan] =
+ REINDEX ~> opt(INDEX ~> TABLE ~> ident) ~ ontable ~
+ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+ opt(";") ^^ {
+ case indexName ~ tableIdent ~ segments =>
+ IndexRepairCommand(indexName, tableIdent, null, segments)
+ }
+
+ /**
+ * REINDEX DATABASEON db_name
+ * [WHERE SEGMENT.ID IN (segment_id, ...)]
+ */
+ protected lazy val repairIndexDatabase: Parser[LogicalPlan] =
+ REINDEX ~> DATABASE ~> ident ~
+ (WHERE ~> (SEGMENT ~ "." ~ ID) ~> IN ~> "(" ~> repsep(segmentId, ",") <~ ")").? <~
+ opt(";") ^^ {
+ case dbName ~ segments =>
+ IndexRepairCommand(None, null, dbName, segments)
+ }
+
protected lazy val registerIndexes: Parser[LogicalPlan] =
REGISTER ~> INDEX ~> TABLE ~> ident ~ ontable <~ opt(";") ^^ {
case indexTable ~ table =>
@@ -676,7 +705,7 @@ class CarbonSpark2SqlParser extends CarbonDDLSqlParser {
/**
* REFRESH INDEX index_name
- * ON [db_name.]table_ame
+ * ON [db_name.]table_name
* [WHERE SEGMENT.ID IN (segment_id, ...)]
*/
protected lazy val refreshIndex: Parser[LogicalPlan] =
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 89d60f6..cd7294b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -17,31 +17,20 @@
package org.apache.spark.sql.secondaryindex.events
-import java.util
-
import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{CarbonEnv, SparkSession}
-import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.index.CarbonIndexUtil
-import org.apache.spark.sql.secondaryindex.command.IndexModel
-import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.datastore.impl.FileFactory
-import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable
-import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
-import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.CarbonProperties
import org.apache.carbondata.events.{Event, OperationContext, OperationEventListener}
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePostStatusUpdateEvent
-import org.apache.carbondata.processing.loading.model.CarbonLoadModel
/**
* This Listener is to load the data to failed segments of Secondary index table(s)
@@ -57,228 +46,33 @@ class SILoadEventListenerForFailedSegments extends OperationEventListener with L
event match {
case postStatusUpdateEvent: LoadTablePostStatusUpdateEvent =>
LOGGER.info("Load post status update event-listener called")
- val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
- val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
- val sparkSession = SparkSession.getActiveSession.get
- // when Si creation and load to main table are parallel, get the carbonTable from the
- // metastore which will have the latest index Info
- val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
- val carbonTable = metaStore
- .lookupRelation(Some(carbonLoadModel.getDatabaseName),
- carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
- val indexMetadata = carbonTable.getIndexMetadata
- val secondaryIndexProvider = IndexType.SI.getIndexProviderName
- if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
+ val loadTablePostStatusUpdateEvent = event.asInstanceOf[LoadTablePostStatusUpdateEvent]
+ val carbonLoadModel = loadTablePostStatusUpdateEvent.getCarbonLoadModel
+ val sparkSession = SparkSession.getActiveSession.get
+ // when Si creation and load to main table are parallel, get the carbonTable from the
+ // metastore which will have the latest index Info
+ val metaStore = CarbonEnv.getInstance(sparkSession).carbonMetaStore
+ val carbonTable = metaStore
+ .lookupRelation(Some(carbonLoadModel.getDatabaseName),
+ carbonLoadModel.getTableName)(sparkSession)
+ .asInstanceOf[CarbonRelation].carbonTable
+ val indexMetadata = carbonTable.getIndexMetadata
+ val secondaryIndexProvider = IndexType.SI.getIndexProviderName
+ if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
- val indexTables = indexMetadata.getIndexesMap
- .get(secondaryIndexProvider).keySet().asScala
- // if there are no index tables for a given fact table do not perform any action
- if (indexTables.nonEmpty) {
- val mainTableDetails =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- indexTables.foreach {
- indexTableName =>
- val isLoadSIForFailedSegments = sparkSession.sessionState.catalog
- .getTableMetadata(TableIdentifier(indexTableName,
- Some(carbonLoadModel.getDatabaseName))).storage.properties
- .getOrElse("isSITableEnabled", "true").toBoolean
- val indexTable = metaStore
- .lookupRelation(Some(carbonLoadModel.getDatabaseName), indexTableName)(
- sparkSession)
- .asInstanceOf[CarbonRelation]
- .carbonTable
-
- val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- var segmentLocks: ListBuffer[ICarbonLock] = ListBuffer.empty
- if (!isLoadSIForFailedSegments
- || !CarbonInternalLoaderUtil.checkMainTableSegEqualToSISeg(
- mainTblLoadMetadataDetails,
- siTblLoadMetadataDetails)) {
- val indexColumns = indexMetadata.getIndexColumns(secondaryIndexProvider,
- indexTableName)
- val secondaryIndex = IndexModel(Some(carbonTable.getDatabaseName),
- indexMetadata.getParentTableName,
- indexColumns.split(",").toList,
- indexTableName)
-
- var details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- // If it empty, then no need to do further computations because the
- // tablet status might not have been created and hence next load will take care
- if (details.isEmpty) {
- return
- }
-
- val failedLoadMetadataDetails: java.util.List[LoadMetadataDetails] = new util
- .ArrayList[LoadMetadataDetails]()
-
- // read the details of SI table and get all the failed segments during SI
- // creation which are MARKED_FOR_DELETE or invalid INSERT_IN_PROGRESS
- details.collect {
- case loadMetaDetail: LoadMetadataDetails =>
- if (loadMetaDetail.getSegmentStatus == SegmentStatus.MARKED_FOR_DELETE &&
- checkIfMainTableLoadIsValid(mainTableDetails,
- loadMetaDetail.getLoadName)) {
- failedLoadMetadataDetails.add(loadMetaDetail)
- } else if ((loadMetaDetail.getSegmentStatus ==
- SegmentStatus.INSERT_IN_PROGRESS ||
- loadMetaDetail.getSegmentStatus ==
- SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS) &&
- checkIfMainTableLoadIsValid(mainTableDetails,
- loadMetaDetail.getLoadName)) {
- val segmentLock = CarbonLockFactory
- .getCarbonLockObj(indexTable.getAbsoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(loadMetaDetail.getLoadName) +
- LockUsage.LOCK)
- try {
- if (segmentLock.lockWithRetries(1, 0)) {
- LOGGER
- .info("SIFailedLoadListener: Acquired segment lock on segment:" +
- loadMetaDetail.getLoadName)
- failedLoadMetadataDetails.add(loadMetaDetail)
- }
- } finally {
- segmentLock.unlock()
- LOGGER
- .info("SIFailedLoadListener: Released segment lock on segment:" +
- loadMetaDetail.getLoadName)
- }
- }
- }
- // check for the skipped segments. compare the main table and SI table table
- // status file and get the skipped segments if any
- CarbonInternalLoaderUtil.getListOfValidSlices(mainTableDetails).asScala
- .foreach(metadataDetail => {
- val detail = details
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- val mainTableDetail = mainTableDetails
- .filter(metadata => metadata.getLoadName.equals(metadataDetail))
- if (null == detail || detail.length == 0) {
- val newDetails = new LoadMetadataDetails
- newDetails.setLoadName(metadataDetail)
- LOGGER.error("Added in SILoadFailedSegment " + newDetails.getLoadName)
- failedLoadMetadataDetails.add(newDetails)
- } else if (detail != null && detail.length !=0 && metadataDetail != null
- && metadataDetail.length != 0) {
- // If SI table has compacted segments and main table does not have
- // compacted segments due to some failure while compaction, need to
- // reload the original segments in this case.
- if (detail(0).getSegmentStatus == SegmentStatus.COMPACTED &&
- mainTableDetail(0).getSegmentStatus == SegmentStatus.SUCCESS) {
- detail(0).setSegmentStatus(SegmentStatus.SUCCESS)
- // in concurrent scenario, if a compaction is going on table, then SI
- // segments are updated first in table status and then the main table
- // segment, so in any load runs parallel this listener shouldn't consider
- // those segments accidentally. So try to take the segment lock.
- val segmentLockOfProbableOnCompactionSeg = CarbonLockFactory
- .getCarbonLockObj(carbonTable.getAbsoluteTableIdentifier,
- CarbonTablePath.addSegmentPrefix(mainTableDetail(0).getLoadName) +
- LockUsage.LOCK)
- if (segmentLockOfProbableOnCompactionSeg.lockWithRetries()) {
- segmentLocks += segmentLockOfProbableOnCompactionSeg
- LOGGER.error("Added in SILoadFailedSegment " + detail(0).getLoadName)
- failedLoadMetadataDetails.add(detail(0))
- }
- }
- }
- })
- try {
- if (!failedLoadMetadataDetails.isEmpty) {
- // in the case when in SI table a segment is deleted and it's entry is
- // deleted from the tablestatus file, the corresponding .segment file from
- // the metadata folder should also be deleted as it contains the
- // merge file name which does not exist anymore as the segment is deleted.
- deleteStaleSegmentFileIfPresent(carbonLoadModel,
- indexTable,
- failedLoadMetadataDetails)
- CarbonIndexUtil
- .LoadToSITable(sparkSession,
- carbonLoadModel,
- indexTableName,
- isLoadToFailedSISegments = true,
- secondaryIndex,
- carbonTable, indexTable, failedLoadMetadataDetails)
-
- // get the current load metadata details of the index table
- details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
- }
-
- // get updated main table segments and si table segments
- val mainTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
- val siTblLoadMetadataDetails: Array[LoadMetadataDetails] =
- SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
-
- // check if main table has load in progress and SI table has no load
- // in progress entry, then no need to enable the SI table
- // Only if the valid segments of main table match the valid segments of SI
- // table then we can enable the SI for query
- if (CarbonInternalLoaderUtil
- .checkMainTableSegEqualToSISeg(mainTblLoadMetadataDetails,
- siTblLoadMetadataDetails)
- && CarbonInternalLoaderUtil.checkInProgLoadInMainTableAndSI(carbonTable,
- mainTblLoadMetadataDetails, siTblLoadMetadataDetails)) {
- // enable the SI table if it was disabled earlier due to failure during SI
- // creation time
- sparkSession.sql(
- s"""ALTER TABLE ${carbonLoadModel.getDatabaseName}.$indexTableName SET
- |SERDEPROPERTIES ('isSITableEnabled' = 'true')""".stripMargin).collect()
- }
- } catch {
- case ex: Exception =>
- // in case of SI load only for for failed segments, catch the exception, but
- // do not fail the main table load, as main table segments should be available
- // for query
- LOGGER.error(s"Load to SI table to $indexTableName is failed " +
- s"or SI table ENABLE is failed. ", ex)
- return
- } finally {
- segmentLocks.foreach {
- segmentLock => segmentLock.unlock()
- }
- }
- }
+ val indexTables = indexMetadata.getIndexesMap
+ .get(secondaryIndexProvider).keySet().asScala
+ // if there are no index tables for a given fact table do not perform any action
+ if (indexTables.nonEmpty) {
+ val mainTableDetails =
+ SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+ indexTables.foreach {
+ indexTableName =>
+ CarbonIndexUtil.processSIRepair(indexTableName, carbonTable, carbonLoadModel,
+ indexMetadata, mainTableDetails.toList, secondaryIndexProvider)(sparkSession)
+ }
}
}
- }
- }
- }
-
- def checkIfMainTableLoadIsValid(mainTableDetails: Array[LoadMetadataDetails],
- loadName: String): Boolean = {
- // in concurrent scenarios there can be cases when loadName is not present in the
- // mainTableDetails array. Added a check to see if the loadName is even present in the
- // mainTableDetails.
- val mainTableLoadDetail = mainTableDetails
- .filter(mainTableDetail => mainTableDetail.getLoadName.equals(loadName))
- if (mainTableLoadDetail.length == 0) {
- false
- } else {
- if (mainTableLoadDetail.head.getSegmentStatus ==
- SegmentStatus.MARKED_FOR_DELETE ||
- mainTableLoadDetail.head.getSegmentStatus == SegmentStatus.COMPACTED) {
- false
- } else {
- true
- }
}
}
-
- def deleteStaleSegmentFileIfPresent(carbonLoadModel: CarbonLoadModel, indexTable: CarbonTable,
- failedLoadMetaDataDetails: java.util.List[LoadMetadataDetails]): Unit = {
- failedLoadMetaDataDetails.asScala.map(failedLoadMetaData => {
- carbonLoadModel.getLoadMetadataDetails.asScala.map(loadMetaData => {
- if (failedLoadMetaData.getLoadName == loadMetaData.getLoadName) {
- val segmentFilePath = CarbonTablePath.getSegmentFilesLocation(indexTable.getTablePath) +
- CarbonCommonConstants.FILE_SEPARATOR + loadMetaData.getSegmentFile
- if (FileFactory.isFileExist(segmentFilePath)) {
- // delete the file if it exists
- FileFactory.deleteFile(segmentFilePath)
- }
- }
- })
- })
- }
}