You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ak...@apache.org on 2020/12/02 06:37:56 UTC

[carbondata] branch master updated: [CARBONDATA-4052] Handled insert overwrite scenario for SI

This is an automated email from the ASF dual-hosted git repository.

akashrn5 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 2add61e  [CARBONDATA-4052] Handled insert overwrite scenario for SI
2add61e is described below

commit 2add61e63575b02c51d3070df71be042012d746d
Author: Nihal ojha <ni...@gmail.com>
AuthorDate: Fri Nov 20 15:38:03 2020 +0530

    [CARBONDATA-4052] Handled insert overwrite scenario for SI
    
    Why is this PR needed?
    Currently in case of insert overwrite query SI existing segment
    are not getting marked for delete. Because of this select query
    on SI table is giving wrong result.
    
    What changes were proposed in this PR?
    Handled insert overwrite for SI table and changed the status of
    existing segment as MARKED_FOR_DELETE.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #4015
---
 .../secondaryindex/TestSIWithInsertOverwrite.scala | 84 ++++++++++++++++++++++
 .../apache/spark/sql/index/CarbonIndexUtil.scala   | 14 +++-
 .../events/SILoadEventListener.scala               |  8 ++-
 .../secondaryindex/rdd/SecondaryIndexCreator.scala | 31 ++++++--
 4 files changed, 127 insertions(+), 10 deletions(-)

diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
new file mode 100644
index 0000000..d97e536
--- /dev/null
+++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithInsertOverwrite.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.secondaryindex;
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+import org.scalatest.BeforeAndAfterEach
+
+class TestSIWithInsertOverwrite extends QueryTest with BeforeAndAfterEach {
+
+  override protected def beforeEach(): Unit = {
+    sql("drop table if exists maintable")
+    sql("create table maintable(name string, Id int, address string) stored as carbondata")
+    sql("drop index if exists maintable_si on maintable")
+    sql("CREATE INDEX maintable_si  on table maintable (address) as 'carbondata'")
+  }
+
+  test("test insert overwrite with SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("insert into maintable select 'brinjal',2,'valid'")
+    sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+  }
+
+  test("test insert overwrite with CTAS and SI") {
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("drop table if exists ctas_maintable")
+    sql("CREATE TABLE ctas_maintable " +
+      "STORED AS carbondata as select * from maintable")
+    sql("CREATE INDEX ctas_maintable_si  on table ctas_maintable (address) as 'carbondata'")
+    checkAnswer(sql("select address from ctas_maintable_si"), Seq(Row("nko")))
+    sql("insert overwrite table ctas_maintable select 'nihal', 1, 'asdfa'")
+    checkAnswer(sql("select count(*) from ctas_maintable_si WHERE address='nko'"), Seq(Row(0)))
+    checkAnswer(sql("select address from ctas_maintable_si"), Seq(Row("asdfa")))
+    sql("drop index if exists ctas_maintable_si on ctas_maintable")
+    sql("drop table if exists ctas_maintable")
+  }
+
+  test("test insert overwrite with table having partition with SI") {
+    sql("drop table if exists partitionwithSI")
+    sql(
+      """
+        | CREATE TABLE if not exists partitionwithSI (empname String, age int, address string,
+        |  year int, month int) PARTITIONED BY (day int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql("CREATE INDEX partition_si  on table partitionwithSI (address) as 'carbondata'")
+    sql("insert into partitionwithSI values('k',2,'some add',2014,1,1)")
+    sql("insert overwrite table partitionwithSI values('v',3,'changed add',2014,1,1)")
+    checkAnswer(sql("select address from partition_si"), Seq(Row("changed add")))
+    sql("drop table if exists partitionwithSI")
+  }
+
+  test("test insert overwrite with SI global sort") {
+    sql("drop index if exists maintable_si on maintable")
+    sql("CREATE INDEX maintable_si  on table maintable (address) as 'carbondata' " +
+      "properties('sort_scope'='global_sort', 'Global_sort_partitions'='3')")
+    sql("insert into maintable select 'nihal',1,'nko'")
+    sql("insert into maintable select 'brinjal',2,'valid'")
+    sql("insert overwrite table maintable select 'nihal', 1, 'asdfa'")
+    checkAnswer(sql("select count(*) from maintable_si WHERE address='nko'"), Seq(Row(0)))
+    checkAnswer(sql("select address from maintable_si"), Seq(Row("asdfa")))
+  }
+
+  override protected def afterEach(): Unit = {
+    sql("drop index if exists maintable_si on maintable")
+    sql("drop table if exists maintable")
+  }
+}
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
index c7f61e4..85d8ffb 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/index/CarbonIndexUtil.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
 import org.apache.spark.sql.{CarbonDatasourceHadoopRelation, CarbonEnv, SparkSession}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalogUtil}
+import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.secondaryindex.command.{IndexModel, SecondaryIndexModel}
 import org.apache.spark.sql.secondaryindex.hive.CarbonInternalMetastore
 import org.apache.spark.sql.secondaryindex.load.CarbonInternalLoaderUtil
@@ -267,6 +267,7 @@ object CarbonIndexUtil {
     secondaryIndex: IndexModel,
     carbonTable: CarbonTable,
     indexTable: CarbonTable,
+    isInsertOverWrite: Boolean = false,
     failedLoadMetaDataDetils: java.util.List[LoadMetadataDetails] = null): Unit = {
 
     var segmentIdToLoadStartTimeMapping: scala.collection.mutable.Map[String, java.lang.Long] =
@@ -299,6 +300,13 @@ object CarbonIndexUtil {
         .Map((carbonLoadModel.getSegmentId, carbonLoadModel.getFactTimeStamp))
     }
     val header = indexTable.getCreateOrderColumn.asScala.map(_.getColName).toArray
+    if (isInsertOverWrite) {
+      val loadMetadataDetails = carbonLoadModel.getLoadMetadataDetails.asScala
+      loadMetadataDetails.foreach { loadMetadata =>
+        segmentIdToLoadStartTimeMapping.put(loadMetadata.getLoadName,
+          loadMetadata.getLoadStartTime)
+      }
+    }
     initializeSILoadModel(carbonLoadModel, header)
     val secondaryIndexModel = if (isLoadToFailedSISegments) {
       SecondaryIndexModel(
@@ -324,7 +332,7 @@ object CarbonIndexUtil {
         indexTable,
         forceAccessSegment = true,
         isCompactionCall = false,
-        isLoadToFailedSISegments)
+        isLoadToFailedSISegments, isInsertOverWrite)
   }
 
   /**
@@ -510,7 +518,7 @@ object CarbonIndexUtil {
               indexTableName,
               isLoadToFailedSISegments = true,
               indexModel,
-              carbonTable, indexTable, failedLoadMetadataDetails)
+              carbonTable, indexTable, false, failedLoadMetadataDetails)
 
           // get the current load metadata details of the index table
           // details = SegmentStatusManager.readLoadMetadata(indexTable.getMetadataPath)
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
index 3958746..0b40ed8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.secondaryindex.events
 
 import scala.collection.JavaConverters._
+import scala.util.Try
 
 import org.apache.log4j.Logger
 import org.apache.spark.internal.Logging
@@ -27,9 +28,7 @@ import org.apache.spark.sql.index.CarbonIndexUtil
 import org.apache.spark.sql.secondaryindex.command.IndexModel
 
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.index.IndexType
-import org.apache.carbondata.core.metadata.schema.indextable.IndexMetadata
 import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.events.LoadEvents.LoadTablePreStatusUpdateEvent
 
@@ -79,13 +78,16 @@ class SILoadEventListener extends OperationEventListener with Logging {
                   .lookupRelation(Some(carbonLoadModel.getDatabaseName),
                     indexTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
 
+                val isInsertOverwrite = operationContext.getProperties
+                  .containsKey("isOverwrite") && Try(operationContext
+                  .getProperty("isOverwrite").toString.toBoolean).getOrElse(false)
                 CarbonIndexUtil
                   .LoadToSITable(sparkSession,
                     carbonLoadModel,
                     indexTableName,
                     isLoadToFailedSISegments = false,
                     secondaryIndex,
-                    carbonTable, indexTable)
+                    carbonTable, indexTable, isInsertOverwrite)
             }
           } else {
             logInfo(s"No index tables found for table: ${carbonTable.getTableName}")
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
index 2b2f7c0..82c92d2 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/rdd/SecondaryIndexCreator.scala
@@ -65,7 +65,9 @@ object SecondaryIndexCreator {
     indexTable: CarbonTable,
     forceAccessSegment: Boolean = false,
     isCompactionCall: Boolean,
-    isLoadToFailedSISegments: Boolean): (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
+    isLoadToFailedSISegments: Boolean,
+    isInsertOverwrite: Boolean = false):
+  (CarbonTable, ListBuffer[ICarbonLock], OperationContext) = {
     var indexCarbonTable = indexTable
     val sc = secondaryIndexModel.sqlContext
     // get the thread pool size for secondary index creation
@@ -131,12 +133,16 @@ object SecondaryIndexCreator {
 
       LOGGER.info(s"${indexCarbonTable.getTableUniqueName}: SI loading is started " +
               s"for segments: $validSegmentList")
-
+      var segmentStatus = if (isInsertOverwrite) {
+        SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS
+      } else {
+        SegmentStatus.INSERT_IN_PROGRESS
+      }
       FileInternalUtil
         .updateTableStatus(validSegmentList,
           secondaryIndexModel.carbonLoadModel.getDatabaseName,
           secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
+          segmentStatus,
           secondaryIndexModel.segmentIdToLoadStartTimeMapping,
           new java.util
           .HashMap[String,
@@ -342,7 +348,7 @@ object SecondaryIndexCreator {
           successSISegments,
           secondaryIndexModel.carbonLoadModel.getDatabaseName,
           secondaryIndexModel.secondaryIndex.indexName,
-          SegmentStatus.INSERT_IN_PROGRESS,
+          segmentStatus,
           secondaryIndexModel.segmentIdToLoadStartTimeMapping,
           segmentToLoadStartTimeMap,
           indexCarbonTable,
@@ -384,6 +390,23 @@ object SecondaryIndexCreator {
           secondaryIndexModel.sqlContext.sparkSession,
           carbonLoadModelForMergeDataFiles.getFactTimeStamp,
           rebuiltSegments)
+
+        if (isInsertOverwrite) {
+          val overriddenSegments = SegmentStatusManager
+          .readLoadMetadata(indexCarbonTable.getMetadataPath)
+            .filter(loadMetadata => !successSISegments.contains(loadMetadata.getLoadName))
+            .map(_.getLoadName).toList
+          FileInternalUtil
+            .updateTableStatus(
+              overriddenSegments,
+              secondaryIndexModel.carbonLoadModel.getDatabaseName,
+              secondaryIndexModel.secondaryIndex.indexName,
+              SegmentStatus.MARKED_FOR_DELETE,
+              secondaryIndexModel.segmentIdToLoadStartTimeMapping,
+              segmentToLoadStartTimeMap,
+              indexTable,
+              secondaryIndexModel.sqlContext.sparkSession)
+        }
       }
 
       if (!isCompactionCall) {