You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/09/14 15:00:27 UTC

[carbondata] branch master updated: [CARBONDATA-3793]Fix update and delete issue when multiple partition columns are present and clean files issue

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new b9a1398  [CARBONDATA-3793]Fix update and delete issue when multiple partition columns are present and clean files issue
b9a1398 is described below

commit b9a1398233d2e647f6fcab8c7755266c8df86477
Author: akashrn5 <ak...@gmail.com>
AuthorDate: Fri Sep 4 11:22:06 2020 +0530

    [CARBONDATA-3793]Fix update and delete issue when multiple partition columns are present and clean files issue
    
    Why is this PR needed?
    1. After #3837 , when there are multiple partition columns are present in table,
    update and delete not happening as , it was considering the tuple id as external
    segment tuple ID , Because when multiple partitions are present TID contains # character.
    2. when multiple segments are present, and some segments are updated, and clean
    files delete the segment files of non updated segments considering as stale files.
    
    What changes were proposed in this PR?
    1. To double check for external segment, check if the segment metadata details has path is not null.
    2. delete the old segment files of only the updated segments.
    
    This closes #3911
---
 .../carbondata/core/mutate/CarbonUpdateUtil.java   |  9 +++--
 .../carbondata/core/util/path/CarbonTablePath.java |  7 ++++
 .../carbondata/core/util/CarbonUtilTest.java       |  8 ++++-
 .../command/mutation/DeleteExecution.scala         |  4 +--
 .../testsuite/iud/UpdateCarbonTableTestCase.scala  | 42 ++++++++++++++++++----
 5 files changed, 58 insertions(+), 12 deletions(-)

diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
index f43a5dc..5dce0f6 100644
--- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
@@ -90,7 +90,9 @@ public class CarbonUpdateUtil {
     // in add segment case, it will be in second index as the blockletID is generated by adding the
     // complete external path
     // this is in case of the external segment, where the tuple id has external path with #
-    if (Tid.contains("#")) {
+    // here no need to check for any path present in metadta details, as # can come in tuple id in
+    // case of multiple partitions, so partition check is already present above.
+    if (Tid.contains("#/")) {
       return getRequiredFieldFromTID(Tid, TupleIdEnum.EXTERNAL_SEGMENT_ID)
           + CarbonCommonConstants.FILE_SEPARATOR + getRequiredFieldFromTID(Tid,
           TupleIdEnum.EXTERNAL_BLOCK_ID);
@@ -620,6 +622,8 @@ public class CarbonUpdateUtil {
     CarbonFile segmentFilesLocation =
         FileFactory.getCarbonFile(CarbonTablePath.getSegmentFilesLocation(table.getTablePath()));
     Set<String> segmentFilesNotToDelete = new HashSet<>();
+    Set<String> updatedSegmentIDs = new HashSet<>(Arrays.asList(
+        segmentFilesToBeUpdated.stream().map(Segment::getSegmentNo).toArray(String[]::new)));
     for (Segment segment : segmentFilesToBeUpdated) {
       SegmentFileStore fileStore =
           new SegmentFileStore(table.getTablePath(), segment.getSegmentFileName());
@@ -636,7 +640,8 @@ public class CarbonUpdateUtil {
       CarbonFile[] invalidSegmentFiles = segmentFilesLocation.listFiles(new CarbonFileFilter() {
         @Override
         public boolean accept(CarbonFile file) {
-          return !segmentFilesNotToDelete.contains(file.getName());
+          return !segmentFilesNotToDelete.contains(file.getName()) && updatedSegmentIDs
+              .contains(CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(file.getName()));
         }
       });
       for (CarbonFile invalidSegmentFile : invalidSegmentFiles) {
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 7482a1c..c89448c 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -564,6 +564,13 @@ public class CarbonTablePath {
     }
 
     /**
+     * This method returns the segment number from the segment file name
+     */
+    public static String getSegmentNoFromSegmentFile(String segmentFileName) {
+      return segmentFileName.split(CarbonCommonConstants.UNDERSCORE)[0];
+    }
+
+    /**
      * gets segment id from given absolute data file path
      */
     public static String getSegmentIdFromPath(String dataFileAbsolutePath) {
diff --git a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
index fd98673..14388ca 100644
--- a/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/util/CarbonUtilTest.java
@@ -30,11 +30,11 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.datastore.block.TableBlockInfo;
 import org.apache.carbondata.core.datastore.chunk.impl.FixedLengthDimensionColumnPage;
 import org.apache.carbondata.core.datastore.filesystem.LocalCarbonFile;
 import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.index.Segment;
 import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
 import org.apache.carbondata.core.metadata.blocklet.DataFileFooter;
 import org.apache.carbondata.core.metadata.datatype.DataType;
@@ -947,6 +947,12 @@ public class CarbonUtilTest {
     Assert.assertEquals(CarbonUpdateUtil.getSegmentBlockNameKey("0", blockName, true), "0-0_0-0-0-1597412488102");
   }
 
+  @Test public void testSegmentNumberFromSegmentFile() {
+    String segmentFileName = "0_1597411003332";
+    Assert.assertEquals("0",
+        CarbonTablePath.DataFileUtil.getSegmentNoFromSegmentFile(segmentFileName));
+  }
+
   private String generateString(int length) {
     StringBuilder builder = new StringBuilder();
     for (int i = 0; i < length; i++) {
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index d079157..25a832f 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -234,7 +234,7 @@ object DeleteExecution {
                   TupleIdEnum.BLOCKLET_ID.getTupleIdIndex),
                 Integer.parseInt(CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                   TupleIdEnum.PAGE_ID.getTupleIdIndex)))
-            } else if (TID.contains("#")) {
+            } else if (TID.contains("#/") && load.getPath != null) {
               // this is in case of the external segment, where the tuple id has external path with#
               (CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_OFFSET),
                 CarbonUpdateUtil.getRequiredFieldFromTID(TID, TupleIdEnum.EXTERNAL_BLOCKLET_ID),
@@ -275,7 +275,7 @@ object DeleteExecution {
             columnCompressor = CompressorFactory.getInstance.getCompressor.getName
           }
           var blockNameFromTupleID =
-            if (TID.contains("#")) {
+            if (TID.contains("#/") && load.getPath != null) {
               CarbonUpdateUtil.getRequiredFieldFromTID(TID,
                 TupleIdEnum.EXTERNAL_BLOCK_ID)
             } else {
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
index 7c9f966..6a6c191 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala
@@ -18,17 +18,15 @@ package org.apache.carbondata.spark.testsuite.iud
 
 import java.io.File
 
-import org.apache.spark.sql.test.SparkTestQueryExecutor
-import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
-import org.scalatest.{BeforeAndAfterAll, ConfigMap}
-
-import org.apache.carbondata.core.constants.{CarbonCommonConstants, CarbonLoadOptionConstants}
-import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.spark.sql.test.util.QueryTest
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SaveMode}
+import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.core.util.path.CarbonTablePath
 
 class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
@@ -76,12 +74,23 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("""create table iud.zerorows (c1 string,c2 int,c3 string,c5 string) STORED AS carbondata""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
     sql(s"""LOAD DATA LOCAL INPATH '$resourcesPath/IUD/dest.csv' INTO table iud.zerorows""")
+    sql("insert into iud.zerorows select 'abc',34,'def','des'")
     sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'a'""").show()
     sql("""update zerorows d  set (d.c2) = (d.c2 + 1) where d.c1 = 'b'""").show()
     sql("clean files for table iud.zerorows")
     checkAnswer(
       sql("""select c1,c2,c3,c5 from iud.zerorows"""),
-      Seq(Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"),Row("a",2,"aa","aaa"),Row("b",3,"bb","bbb"),Row("c",3,"cc","ccc"),Row("d",4,"dd","ddd"),Row("e",5,"ee","eee"))
+      Seq(Row("a", 2, "aa", "aaa"),
+        Row("abc", 34, "def", "des"),
+        Row("b", 3, "bb", "bbb"),
+        Row("c", 3, "cc", "ccc"),
+        Row("d", 4, "dd", "ddd"),
+        Row("e", 5, "ee", "eee"),
+        Row("a", 2, "aa", "aaa"),
+        Row("b", 3, "bb", "bbb"),
+        Row("c", 3, "cc", "ccc"),
+        Row("d", 4, "dd", "ddd"),
+        Row("e", 5, "ee", "eee"))
     )
     sql("""drop table iud.zerorows""")
   }
@@ -906,6 +915,25 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql("drop table if exists test_return_row_count_source")
   }
 
+  test("test update on a table with multiple partition directories") {
+    sql("drop table if exists partitionMultiple")
+    import sqlContext.implicits._
+    val df = sqlContext.sparkContext.parallelize(1 to 4, 4)
+      .map { x => (s"name$x", s"$x", s"region$x", s"country$x", s"city$x")
+      }.toDF("name", "age", "region", "country", "city")
+    df.write.format("carbondata")
+      .option("tableName", "partitionMultiple")
+      .option("partitionColumns", "region, country, city")
+      .mode(SaveMode.Overwrite)
+      .save()
+    checkAnswer(sql("delete from partitionMultiple where name = 'name2'"), Seq(Row(1)))
+    checkAnswer(sql("update partitionMultiple set(name) = ('Joey') where age = 3"), Seq(Row(1)))
+    checkAnswer(sql("select * from partitionMultiple"),
+      Seq(Row("name1", "1", "region1", "country1", "city1"),
+        Row("name4", "4", "region4", "country4", "city4"),
+        Row("Joey", "3", "region3", "country3", "city3")))
+  }
+
   test("test update for partition table without merge index files for segment") {
     try {
       sql("DROP TABLE IF EXISTS iud.partition_nomerge_index")