You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/04/08 06:26:20 UTC

[carbondata] branch master updated: [CARBONDATA-3738] : Delete seg. by ID is displaying as failed with invalid ID upon deleting a added parquet segment

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8936961  [CARBONDATA-3738] : Delete seg. by ID is displaying as failed with invalid ID upon deleting a added parquet segment
8936961 is described below

commit 89369613e1374489ed63e0ab358244c348b63918
Author: Vikram Ahuja <vi...@gmail.com>
AuthorDate: Thu Mar 5 15:54:20 2020 +0530

    [CARBONDATA-3738] : Delete seg. by ID is displaying as failed with invalid ID upon
    deleting a added parquet segment
    
    Why is this PR needed?
    Unable to delete segment in case of SI when table status file is not present.
    
    What changes were proposed in this PR?
    Checking for the table status file before triggering delete for that segment.
    
    This closes #3659
---
 .../command/management/CarbonAddLoadCommand.scala  |  6 +++-
 .../events/DeleteSegmentByIdListener.scala         | 11 +++++--
 .../testsuite/addsegment/AddSegmentTestCase.scala  | 35 ++++++++++++++++++++++
 3 files changed, 49 insertions(+), 3 deletions(-)

diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index f54f3ae..9db450b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -95,7 +95,11 @@ case class CarbonAddLoadCommand(
 
     // If a path is already added then we should block the adding of the same path again.
     val allSegments = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
-    if (allSegments.exists(a => a.getPath != null && a.getPath.equalsIgnoreCase(inputPath))) {
+    // If the segment has been already loaded from the same path and its status is SUCCESS or
+    // PARTIALLY_SUCCESS, throw an exception as we should block the adding of the same path again.
+    if (allSegments.exists(a => a.getPath != null && a.getPath.equalsIgnoreCase(inputPath) &&
+      (a.getSegmentStatus == SegmentStatus.SUCCESS ||
+        a.getSegmentStatus == SegmentStatus .LOAD_PARTIAL_SUCCESS))) {
       throw new AnalysisException(s"path already exists in table status file, can not add same " +
                                   s"segment path repeatedly: $inputPath")
     }
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
index 3b1f69c..cfda2f6 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/DeleteSegmentByIdListener.scala
@@ -27,6 +27,8 @@ import org.apache.spark.sql.secondaryindex.util.CarbonInternalScalaUtil
 
 import org.apache.carbondata.api.CarbonStore
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, Event, OperationContext, OperationEventListener}
 
 class DeleteSegmentByIdListener extends OperationEventListener with Logging {
@@ -49,8 +51,13 @@ class DeleteSegmentByIdListener extends OperationEventListener with Logging {
           val table = metastore
             .lookupRelation(Some(carbonTable.getDatabaseName), tableName)(sparkSession)
             .asInstanceOf[CarbonRelation].carbonTable
-          CarbonStore
-            .deleteLoadById(loadIds, carbonTable.getDatabaseName, table.getTableName, table)
+          val tableStatusFilePath = CarbonTablePath.getTableStatusFilePath(table.getTablePath)
+          // this check is added to verify if the table status file for the index table exists
+          // or not. Delete on index tables is only to be called if the table status file exists.
+          if (FileFactory.isFileExist(tableStatusFilePath)) {
+            CarbonStore
+              .deleteLoadById(loadIds, carbonTable.getDatabaseName, table.getTableName, table)
+          }
         }
     }
   }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
index 6c41abd..3e9f5de 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/addsegment/AddSegmentTestCase.scala
@@ -262,6 +262,39 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     FileFactory.deleteAllFilesOfDir(new File(newPath))
   }
 
+
+  test("Test delete by id for added parquet segment") {
+    sql("drop table if exists addsegment1")
+    sql("drop table if exists addsegment2")
+    sql("drop table if exists addsegment3")
+    createCarbonTable()
+    createParquetTable
+    sql("select * from addsegment2").show()
+    val table = SparkSQLUtil.sessionState(sqlContext.sparkSession).catalog
+      .getTableMetadata(TableIdentifier("addsegment2"))
+    val path = table.location
+    val newPath = storeLocation + "/" + "addsegtest"
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+    copy(path.toString, newPath)
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(10)))
+    sql(
+      """
+        | CREATE TABLE addsegment3 (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Date,attendance int,
+        |  utilization int,salary int, empno int)
+        | STORED AS carbondata
+      """.stripMargin)
+    sql("create index one_one on table addsegment3(designation) as 'carbondata'")
+    sql(s"alter table addsegment3 add segment options('path'='$newPath', 'format'='parquet')").show()
+    sql("show segments for table addsegment3").show(100, false)
+    sql("delete from table addsegment1 where segment.id in(0)")
+    checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
+    sql("clean files for table addsegment1")
+    FileFactory.deleteAllFilesOfDir(new File(newPath))
+  }
+
+
   test("Test delete by id for added segment") {
     createCarbonTable()
     createParquetTable
@@ -288,6 +321,8 @@ class AddSegmentTestCase extends QueryTest with BeforeAndAfterAll {
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(20)))
     sql("show segments for table addsegment1").show(100, false)
     sql("delete from table addsegment1 where segment.id in(0,1)")
+    sql("show segments for table addsegment1").show(100, false)
+
     checkAnswer(sql("select count(*) from addsegment1"), Seq(Row(0)))
     sql("clean files for table addsegment1")
     FileFactory.deleteAllFilesOfDir(new File(newPath))