You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by aj...@apache.org on 2020/09/17 17:01:59 UTC

[carbondata] branch master updated: [CARBONDATA-3891] Fix loading data will update all segments updateDeltaEndTimestamp

This is an automated email from the ASF dual-hosted git repository.

ajantha pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 8aa9522  [CARBONDATA-3891] Fix loading data will update all segments updateDeltaEndTimestamp
8aa9522 is described below

commit 8aa9522c087e0e9bf2cb7c1532742094a4b3bd9a
Author: IceMimosa <ch...@gmail.com>
AuthorDate: Mon Jul 20 23:08:40 2020 +0800

    [CARBONDATA-3891] Fix loading data will update all segments updateDeltaEndTimestamp
    
    Why is this PR needed?
    Loading Data to the partitioned table will update all segments updateDeltaEndTimestamp,that will cause the driver to clear all segments cache when doing the query.
    
    What changes were proposed in this PR?
    update only the current insert overwrite segment.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    Yes
    
    This closes #3848
---
 .../hadoop/api/CarbonOutputCommitter.java          | 21 +++++---------
 .../allqueries/InsertIntoCarbonTableTestCase.scala | 32 ++++++++++++++++++++++
 2 files changed, 39 insertions(+), 14 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 4fd754b..a816894 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -19,7 +19,7 @@ package org.apache.carbondata.hadoop.api;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.HashSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -224,23 +224,16 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
       }
     }
     String updateTime =
-        context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, null);
+        context.getConfiguration().get(CarbonTableOutputFormat.UPDATE_TIMESTAMP, uniqueId);
     String segmentsToBeDeleted =
         context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, "");
-    List<Segment> segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
-    Set<Segment> segmentSet = new HashSet<>();
-    if (updateTime != null || uniqueId != null) {
-      segmentSet = new HashSet<>(
-          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
-              context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
-                  .getValidSegments());
+    List<Segment> segmentDeleteList = Collections.emptyList();
+    if (!segmentsToBeDeleted.trim().isEmpty()) {
+      segmentDeleteList = Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
     }
     if (updateTime != null) {
-      CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, updateTime, true,
-          segmentDeleteList);
-    } else if (uniqueId != null) {
-      CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, uniqueId, true,
-          segmentDeleteList);
+      CarbonUpdateUtil.updateTableMetadataStatus(Collections.singleton(loadModel.getSegment()),
+          carbonTable, updateTime, true, segmentDeleteList);
     }
   }
 
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
index 9588d27..0ed4923 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.carbondata.spark.testsuite.allqueries
 
+import org.apache.spark.sql.CarbonEnv
 import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
@@ -402,6 +403,37 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll {
     sql(s"DROP TABLE IF EXISTS table1")
   }
 
+  test("test loading data into partitioned table with segment's updateDeltaEndTimestamp not change") {
+    val tableName = "test_partitioned_table"
+    sql(s"drop table if exists $tableName")
+    sql(s"""
+           |create table if not exists $tableName(
+           |  id bigint,
+           |  name string
+           |)
+           |STORED AS carbondata
+           |partitioned by (dt string)
+           |""".stripMargin)
+    val carbonTable = CarbonEnv.getCarbonTable(
+      Option(CarbonCommonConstants.DATABASE_DEFAULT_NAME), tableName)(sqlContext.sparkSession)
+    val dt1 = "dt1"
+    sql(s"insert overwrite table $tableName partition(dt='$dt1') select 1, 'a'")
+    val dt1Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    assert(dt1Metas.length == 1)
+    val dt1Seg1 = dt1Metas(0)
+
+    val dt2 = "dt2"
+    sql(s"insert overwrite table $tableName partition(dt='$dt2') select 1, 'a'")
+    val dt2Metas = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
+    assert(dt2Metas.length == 2)
+    val dt2Seg1 = dt2Metas(0)
+    val dt2Seg2 = dt2Metas(1)
+
+    assert(dt1Seg1.getUpdateDeltaEndTimestamp == dt2Seg1.getUpdateDeltaEndTimestamp)
+    assert(dt1Seg1.getUpdateDeltaEndTimestamp != dt2Seg2.getUpdateDeltaEndTimestamp)
+    sql(s"drop table if exists $tableName")
+  }
+
   override def afterAll {
     sql("drop table if exists load")
     sql("drop table if exists inser")