You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2020/06/23 07:26:47 UTC

[carbondata] branch master updated: [CARBONDATA-3858] Check CDC deltafiles count in the testcase

This is an automated email from the ASF dual-hosted git repository.

xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new c4b50ca  [CARBONDATA-3858] Check CDC deltafiles count in the testcase
c4b50ca is described below

commit c4b50ca533071bcec539c3462619a55bd9bf4186
Author: haomarch <ma...@126.com>
AuthorDate: Thu Jun 18 21:08:37 2020 +0800

    [CARBONDATA-3858] Check CDC deltafiles count in the testcase
    
    Check CDC deltafiles count in the testcase
    
    This closes #3793
---
 .../spark/testsuite/merge/MergeTestCase.scala      | 38 ++++++++++++++++++----
 1 file changed, 32 insertions(+), 6 deletions(-)

diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 9246226..6ea702d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -20,6 +20,10 @@ package org.apache.carbondata.spark.testsuite.merge
 import scala.collection.JavaConverters._
 import java.sql.Date
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.spark.sql._
 import org.apache.spark.sql.CarbonSession._
 import org.apache.spark.sql.catalyst.TableIdentifier
@@ -163,6 +167,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
       col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
@@ -175,6 +180,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
     dwSelframe.merge(odsframe, "A.id=B.id").whenMatched("A.state <> B.state").updateExpr(updateMap).execute()
 
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
@@ -188,7 +194,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
       col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
-
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select price from order where where state = 2"), Seq(Row(22500), Row(30000)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
@@ -203,7 +209,6 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       "quantity" -> "B.quantity",
       "price" -> "B.price",
       "state" -> "B.state").asInstanceOf[Map[Any, Any]]
-
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
       col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -219,9 +224,9 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       "quantity" -> "B.quantity",
       "price" -> "B.price",
       "state" -> "B.state").asInstanceOf[Map[Any, Any]]
-
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
       col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
@@ -238,6 +243,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
 
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
       col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
@@ -249,7 +255,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
 
     dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().updateExpr(updateMap).execute()
-
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
 
@@ -294,6 +300,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
@@ -320,7 +327,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
-    sql("select * from order").show()
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -348,6 +355,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -364,6 +372,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
   }
 
@@ -382,6 +391,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
   }
@@ -409,6 +419,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -454,7 +465,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       delete().
       insertExpr(insertMap_d).
       execute()
-    sql("select * from order").show()
+    assert(getDeleteDeltaFileCount("order", "0") == 1)
     checkAnswer(sql("select count(*) from order where c_name = 'delete'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order where c_name = 'insert'"), Seq(Row(2)))
     checkAnswer(sql("select count(*) from order"), Seq(Row(14)))
@@ -501,6 +512,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
     CarbonMergeDataSetCommand(dwSelframe,
       odsframe,
       MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+    assert(getDeleteDeltaFileCount("order", "0") == 3)
     checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
     checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
     checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500)))
@@ -603,6 +615,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       insertExpr(insertMap).
       execute()
 
+    assert(getDeleteDeltaFileCount("customers", "0") == 1)
     checkAnswer(sql("select count(*) from customers"), Seq(Row(6)))
     checkAnswer(sql("select count(*) from customers where current='true'"), Seq(Row(4)))
     checkAnswer(sql("select count(*) from customers where effectivedate is not null and enddate is not null"), Seq(Row(1)))
@@ -654,6 +667,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       insertExpr(insertMap).
       whenMatched("B.deleted=true").
       delete().execute()
+    assert(getDeleteDeltaFileCount("target", "0") == 0)
     checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
     checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"), Row("e", "100")))
   }
@@ -702,10 +716,22 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
       insertExpr(insertMap).
       whenMatched("B.deleted=true").
       delete().execute()
+    assert(getDeleteDeltaFileCount("target", "0") == 1)
     checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
     checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"), Row("e", "100")))
   }
 
+  private def getDeleteDeltaFileCount(tableName: String, segment: String): Int = {
+    val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+    val path = CarbonTablePath
+      .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+    val deleteDeltaFiles = FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonCommonConstants
+        .DELETE_DELTA_FILE_EXT)
+    })
+    deleteDeltaFiles.size()
+  }
+
   override def afterAll {
     sql("drop table if exists order")
   }