You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/12/02 06:37:56 UTC
[carbondata] branch master updated: [CARBONDATA-4052] Handled
insert overwrite scenario for SI
This is an automated email from the ASF dual-hosted git repository.
akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new 2add61e [CARBONDATA-4052] Handled insert overwrite scenario for SI
2add61e is described below
commit 2add61e63575b02c51d3070df71be042012d746d
Author: Nihal ojha <ni...@gmail.com>
AuthorDate: Fri Nov 20 15:38:03 2020 +0530
[CARBONDATA-4052] Handled insert overwrite scenario for SI
Why is this PR needed?
Currently in case of insert overwrite query SI existing segment
are not getting marked for delete. Because of this select query
on SI table is giving wrong result.
What changes were proposed in this PR?
Handled insert overwrite for SI table and changed the status of
existing segment as MARKED_FOR_DELETE.
Does this PR introduce any user interface change?
No
Is any new testcase added?
Yes
This closes #4015
---
.../secondaryindex/TestSIWithInsertOverwrite.scala | 84 ++++++++++++++++++++++
.../apache/spark/sql/index/CarbonIndexUtil.scala | 14 +++-
.../events/SILoadEventListener.scala | 8 ++-
.../secondaryindex/rdd/SecondaryIndexCreator.scala | 31 ++++++--
4 files changed, 127 insertions(+), 10 deletions(-)
diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
new file mode 100644
index 0000000..d97e536
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex;
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach {
+
+ override protected def beforeEach(): Unit = {
+ sql("drop table if exists maintable")
+ sql("create table maintable(name string, Id int, address string) stored as carbondata")
+ sql("drop index if exists maintable_si on maintable")
+ sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata'")
+ }
+
+ test("test insert overwrite with SI") {
+ sql("insert into maintable select 'nihal',1,'nko'")
+ sql("insert into maintable select 'brinjal',2,'valid'")
+ sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+ checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+ checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+ }
+
+ test("test insert overwrite with CTAS and SI") {
+ sql("insert into maintable select 'nihal',1,'nko'")
+ sql("drop table if exists ctas_maintable")
+ sql("CREATE TABLE ctas_maintable " +
+ "STORED AS carbondata as select * from maintable")
+ sql("CREATE INDEX ctas_maintable_si on table ctas_maintable (address) as 'carbondata'")
+ checkAnswer(sql("select address from ctas_maintable_si"), Seq(Row("nko")))
+ sql("insert overwrite table ctas_maintable select 'nihal', 1, 'asdfa'")
+ checkAnswer(sql("select count(*) from ctas_maintable_si WHERE address='nko'"), Seq(Row(0)))
+ checkAnswer(sql("select address from ctas_maintable_si"), Seq(Row("asdfa")))
+ sql("drop index if exists ctas_maintable_si on ctas_maintable")
+ sql("drop table if exists ctas_maintable")
+ }
+
+ test("test insert overwrite with table having partition with SI") {
+ sql("drop table if exists partitionwithSI")
+ sql(
+ """
+ | CREATE TABLE if not exists partitionwithSI (empname String, age int, address string,
+ | year int, month int) PARTITIONED BY (day int)
+ | STORED AS carbondata
+ """.stripMargin)
+ sql("CREATE INDEX partition_si on table partitionwithSI (address) as 'carbondata'")
+ sql("insert into partitionwithSI values('k',2,'some add',2014,1,1)")
+ sql("insert overwrite table partitionwithSI values('v',3,'changed add',2014,1,1)")
+ checkAnswer(sql("select address from partition_si"), Seq(Row("changed add")))
+ sql("drop table if exists partitionwithSI")
+ }
+
+ test("test insert overwrite with SI global sort") {
+ sql("drop index if exists maintable_si on maintable")
+ sql("CREATE INDEX maintable_si on table maintable (address) as 'carbondata' " +
+ "properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+ sql("insert into maintable select 'nihal',1,'nko'")
+ sql("insert into maintable select 'brinjal',2,'valid'")
+ sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+ checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+ checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+ }
+
+ override protected def afterEach(): Unit = {
+ sql("drop index if exists maintable_si on maintable")
+ sql("drop table if exists maintable")
+ }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index c7f61e4..85d8ffb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
+import org.apache.spark.sql.hive.CarbonRelation
import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
@@ -267,6 +267,7 @@ object CarbonIndexUtil {
secondaryIndex: IndexModel,
carbonTable: CarbonTable,
indexTable: CarbonTable,
+ isInsertOverWrite: Boolean = false,
failedLoadMetaDataDetils: java.util.List[LoadMetadataDetails] = null): Unit = {
var segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang.Long] =
@@ -299,6 +300,13 @@ object CarbonIndexUtil {
.Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
}
val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+ if (isInsertOverWrite) {
+ val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+ loadMetadataDetails.foreach { loadMetadata =>
+ segmentIdToLoadStartTimeMapping.put(loadMetadata.getLoadName,
+ loadMetadata.getLoadStartTime)
+ }
+ }
initializeSILoadModel(carbonLoadModel, header)
val secondaryIndexModel = if (isLoadToFailedSISegments) {
SecondaryIndexModel(
@@ -324,7 +332,7 @@ object CarbonIndexUtil {
indexTable,
forceAccessSegment = true,
isCompactionCall = false,
- isLoadToFailedSISegments)
+ isLoadToFailedSISegments, isInsertOverWrite)
}
/**
@@ -510,7 +518,7 @@ object CarbonIndexUtil {
indexTableName,
isLoadToFailedSISegments = true,
indexModel,
- carbonTable, indexTable, failedLoadMetadataDetails)
+ carbonTable, indexTable, false, failedLoadMetadataDetails)
// get the current load metadata details of the index table
// details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
index 3958746..0b40ed8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
@@ -18,6 +18,7 @@
package org.apache.spark.sql.secondaryindex.events
import scala.collection.JavaConverters._
+import scala.util.Try
import org.apache.log4j.Logger
import org.apache.spark.internal.Logging
@@ -27,9 +28,7 @@ import org.apache.spark.sql.index.CarbonIndexUtil
import org.apache.spark.sql.secondaryindex.command.IndexModel
import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
import org.apache.carbondata.events._
import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
@@ -79,13 +78,16 @@ class SILoadEventListener extends OperationEventListener with Logging {
.lookupRelation(Some(carbonLoadModel.getDatabaseName),
indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
+ val isInsertOverwrite = operationContext.getProperties
+ .containsKey("isOverwrite") && Try(operationContext
+ .getProperty("isOverwrite").toString.toBoolean).getOrElse(false)
CarbonIndexUtil
.LoadToSITable(sparkSession,
carbonLoadModel,
indexTableName,
isLoadToFailedSISegments = false,
secondaryIndex,
- carbonTable, indexTable)
+ carbonTable, indexTable, isInsertOverwrite)
}
} else {
logInfo(s"No index tables found for table: ${carbonTable.getTableName}")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 2b2f7c0..82c92d2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -65,7 +65,9 @@ object SecondaryIndexCreator {
indexTable: CarbonTable,
forceAccessSegment: Boolean = false,
isCompactionCall: Boolean,
- isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
+ isLoadToFailedSISegments: Boolean,
+ isInsertOverwrite: Boolean = false):
+ (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
var indexCarbonTable = indexTable
val sc = secondaryIndexModel.sqlContext
// get the thread pool size for secondary index creation
@@ -131,12 +133,16 @@ object SecondaryIndexCreator {
LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
s"for segments: $validSegmentList")
-
+ var segmentStatus = if (isInsertOverwrite) {
+ SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+ } else {
+ SegmentStatus.INSERT_IN_PROGRESS
+ }
FileInternalUtil
.updateTableStatus(validSegmentList,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
- SegmentStatus.INSERT_IN_PROGRESS,
+ segmentStatus,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
new java.util
.HashMap[String,
@@ -342,7 +348,7 @@ object SecondaryIndexCreator {
successSISegments,
secondaryIndexModel.carbonLoadModel.getDatabaseName,
secondaryIndexModel.secondaryIndex.indexName,
- SegmentStatus.INSERT_IN_PROGRESS,
+ segmentStatus,
secondaryIndexModel.segmentIdToLoadStartTimeMapping,
segmentToLoadStartTimeMap,
indexCarbonTable,
@@ -384,6 +390,23 @@ object SecondaryIndexCreator {
secondaryIndexModel.sqlContext.sparkSession,
carbonLoadModelForMergeDataFiles.getFactTimeStamp,
rebuiltSegments)
+
+ if (isInsertOverwrite) {
+ val overriddenSegments = SegmentStatusManager
+ .readLoadMetadata(indexCarbonTable.getMetadataPath)
+ .filter(loadMetadata => !successSISegments.contains(loadMetadata.getLoadName))
+ .map(_.getLoadName).toList
+ FileInternalUtil
+ .updateTableStatus(
+ overriddenSegments,
+ secondaryIndexModel.carbonLoadModel.getDatabaseName,
+ secondaryIndexModel.secondaryIndex.indexName,
+ SegmentStatus.MARKED_FOR_DELETE,
+ secondaryIndexModel.segmentIdToLoadStartTimeMapping,
+ segmentToLoadStartTimeMap,
+ indexTable,
+ secondaryIndexModel.sqlContext.sparkSession)
+ }
}
if (!isCompactionCall) {