You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by ku...@apache.org on 2020/12/17 14:28:03 UTC

[carbondata] branch master updated: [CARBONDATA-4080] Wrong results for select count on invalid segments

This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 35c73e6  [CARBONDATA-4080] Wrong results for select count on invalid segments
35c73e6 is described below

commit 35c73e61c55f39f1bf38105e6bf32ff3673396fb
Author: akkio-97 <ak...@gmail.com>
AuthorDate: Wed Dec 9 15:15:51 2020 +0530

    [CARBONDATA-4080] Wrong results for select count on invalid segments
    
    Why is this PR needed?
    Wrong results for
    1. select count on marked for delete segment
    2. select count on compacted segment
    Issue comes only when the user explicitly sets deleted/compacted
    segments using the property carbon.input.segments.
    As select * on such segments gives 0 rows as output, in order to
    maintain consistency, select count should also give 0 rows.
    
    What changes were proposed in this PR?
    So this PR focuses on considering only valid segments to be accessed
    during select count query.
    
    This closes #4050
---
 .../hadoop/api/CarbonTableInputFormat.java         |  15 ++-
 .../segmentreading/TestSegmentReading.scala        | 104 +++++++++++++++++++++
 2 files changed, 117 insertions(+), 2 deletions(-)

diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
index a2c162e..f24a2f1 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java
@@ -55,6 +55,7 @@ import org.apache.carbondata.core.readcommitter.TableStatusReadCommittedScope;
 import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
 import org.apache.carbondata.core.statusmanager.FileFormat;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
+import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager;
 import org.apache.carbondata.core.statusmanager.StageInputCollector;
@@ -220,7 +221,7 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
    * `INPUT_SEGMENT_NUMBERS` in job configuration
    */
   private List<Segment> getFilteredSegment(JobContext job, List<Segment> validSegments,
-      boolean validationRequired, ReadCommittedScope readCommittedScope) {
+      boolean validationRequired, ReadCommittedScope readCommittedScope) throws IOException {
     Segment[] segmentsToAccess = getSegmentsToAccess(job, readCommittedScope);
     if (segmentsToAccess.length == 0 || segmentsToAccess[0].getSegmentNo().equalsIgnoreCase("*")) {
       return validSegments;
@@ -241,7 +242,17 @@ public class CarbonTableInputFormat<T> extends CarbonInputFormat<T> {
     }
     if (!validationRequired && filteredSegmentToAccess.size() != segmentToAccessMap.size()) {
       for (Segment segment : segmentToAccessMap.values()) {
-        if (!filteredSegmentToAccess.containsKey(segment.getSegmentNo())) {
+        boolean isSegmentValid = true;
+        LoadMetadataDetails[] segmentList = readCommittedScope.getSegmentList();
+        for (LoadMetadataDetails validSegment : segmentList) {
+          if (validSegment.getLoadName().equals(segment.getSegmentNo()) && (
+              validSegment.getSegmentStatus().equals(SegmentStatus.MARKED_FOR_DELETE)
+                  || validSegment.getSegmentStatus().equals(SegmentStatus.COMPACTED))) {
+            isSegmentValid = false;
+            break;
+          }
+        }
+        if (isSegmentValid && !filteredSegmentToAccess.containsKey(segment.getSegmentNo())) {
           filteredSegmentToAccess.put(segment.getSegmentNo(), segment);
         }
       }
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
index 8ce8cc5..5c0336c 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/segmentreading/TestSegmentReading.scala
@@ -22,6 +22,8 @@ import org.apache.spark.sql.test.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.view.rewrite.TestUtil
 
 /**
  * Created by rahul on 19/9/17.
@@ -420,4 +422,106 @@ class TestSegmentReading extends QueryTest with BeforeAndAfterAll {
 
     sql("set spark.sql.adaptive.enabled=false")
   }
+
+  test("Read marked for delete segments") {
+    sql("drop table if exists carbonTable")
+    sql(
+      "create table carbonTable(a string, b int, c string) stored as carbondata ")
+    sql("insert into carbonTable values ('k',1,'k'), ('k',1,'b')")
+    sql("insert into carbonTable values ('a',2,'a')")
+    sql("insert into carbonTable values ('b',2,'b'),('b',2,'b')")
+    sql("insert into carbonTable values ('c',2,'c')")
+
+    sql("delete from table carbonTable where segment.id in (0,3)")
+    sql("set carbon.input.segments.default.carbonTable = 0,2,3")
+
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
+  }
+
+  test("Read marked for delete segments after SI creation") {
+    sql("drop table if exists carbonTable")
+    sql(
+      "create table carbonTable(a string, b int, c string) stored as carbondata ")
+    sql("insert into carbonTable values ('k',1,'k'), ('k',1,'b')")
+    sql("insert into carbonTable values ('a',2,'a')")
+    sql("insert into carbonTable values ('b',2,'b'),('b',2,'b')")
+    sql("insert into carbonTable values ('c',2,'c')")
+
+    sql("drop index if exists indextable1 on carbonTable")
+    sql("create index indextable1 on table carbonTable (c) AS 'carbondata'")
+
+    sql("delete from table carbonTable where segment.id in (0,3)")
+    sql("set carbon.input.segments.default.carbonTable = 0,2,3")
+
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(2)))
+  }
+
+  test("Read compacted segments") {
+    sql("drop table if exists carbonTable")
+    sql(
+      "create table carbonTable(a string, b int, c string) stored as carbondata")
+    sql("insert into carbonTable values ('k',5,'k'), ('k',5,'b')")
+    sql("insert into carbonTable values ('a',1,'a')")
+    sql("insert into carbonTable values ('b',2,'b'),('b',2,'b')")
+    sql("insert into carbonTable values ('c',3,'c')")
+    sql("alter table carbonTable compact 'major'")
+
+    sql("set carbon.input.segments.default.carbonTable = 0,1,2,3,0.1")
+    checkAnswer(sql("select count(*) from carbonTable"), Seq(Row(6)))
+  }
+
+  test("Read compacted segments after SI creation") {
+    sql("drop table if exists carbonTable")
+    sql(
+      "create table carbonTable(a string, b int, c string) stored as carbondata")
+    sql("insert into carbonTable values ('k',5,'k'), ('k',5,'b')")
+    sql("insert into carbonTable values ('a',1,'a')")
+    sql("insert into carbonTable values ('b',2,'b'),('b',2,'b')")
+    sql("insert into carbonTable values ('c',3,'c')")
+
+    sql("drop index if exists indextable1 on carbonTable")
+    sql("create index indextable1 on table carbonTable (c) AS 'carbondata'")
+
+    sql("alter table carbonTable compact 'major'")
+    sql("set carbon.input.segments.default.carbonTable = 0,1,2,3,0.1")
+    checkAnswer(sql("select count(*) from carbonTable where c = 'b'"), Seq(Row(3)))
+  }
+
+  test("Read marked for delete segments for partition tables") {
+    sql("drop table if exists carbonTable")
+    sql(
+      "create table carbonTable(c1 string, c2 int) PARTITIONED by (c3 string) stored as " +
+      "carbondata ")
+    sql("insert into carbonTable values ('k',1,'k'), ('k',2,'k')")
+    sql("insert into carbonTable values ('a',2,'a')")
+    sql("insert into carbonTable values ('b',2,'b'),('b',2,'b')")
+    sql("insert into carbonTable values ('c',2,'c')")
+
+    sql("delete from table carbonTable where segment.id in (0,3)")
+    sql("set carbon.input.segments.default.carbonTable = 0,1,2,3")
+
+    checkAnswer(sql("select count(*) from carbonTable where c3 = 'k' or c3 = 'a' or c3 = 'c'"),
+      Seq(Row(1)))
+  }
+
+  test("Read marked for delete segments with MV creation") {
+    sql("drop table if exists carbonTable")
+    sql(" CREATE TABLE carbonTable (id int, name string)  STORED AS carbondata")
+    sql("insert into carbonTable values(1,'a')")
+    sql("insert into carbonTable values(2,'b')")
+    sql("insert into carbonTable values(3,'c')")
+    sql("insert into carbonTable values(4,'d')")
+    sql("delete from table carbonTable where segment.id in (0,3)")
+
+    sql("drop materialized view if exists agg_sale")
+    sql(
+      "CREATE MATERIALIZED VIEW agg_sale AS SELECT id, name, sum(id) FROM carbonTable GROUP BY " +
+      "id, name")
+
+    sql("set carbon.input.segments.default.carbonTable = 0,1,2,3")
+    checkAnswer(sql("SELECT count(*)  FROM carbonTable where name = 'a' or name = 'd' or name = " +
+                    "'b'"),
+      Seq(Row(1)))
+  }
+
 }