You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@carbondata.apache.org by xu...@apache.org on 2020/06/23 07:26:47 UTC
[carbondata] branch master updated: [CARBONDATA-3858] Check CDC
deltafiles count in the testcase
This is an automated email from the ASF dual-hosted git repository.
xubo245 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push:
new c4b50ca [CARBONDATA-3858] Check CDC deltafiles count in the testcase
c4b50ca is described below
commit c4b50ca533071bcec539c3462619a55bd9bf4186
Author: haomarch <ma...@126.com>
AuthorDate: Thu Jun 18 21:08:37 2020 +0800
[CARBONDATA-3858] Check CDC deltafiles count in the testcase
Check CDC deltafiles count in the testcase
This closes #3793
---
.../spark/testsuite/merge/MergeTestCase.scala | 38 ++++++++++++++++++----
1 file changed, 32 insertions(+), 6 deletions(-)
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
index 9246226..6ea702d 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/merge/MergeTestCase.scala
@@ -20,6 +20,10 @@ package org.apache.carbondata.spark.testsuite.merge
import scala.collection.JavaConverters._
import java.sql.Date
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.util.path.CarbonTablePath
import org.apache.spark.sql._
import org.apache.spark.sql.CarbonSession._
import org.apache.spark.sql.catalyst.TableIdentifier
@@ -163,6 +167,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -175,6 +180,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
dwSelframe.merge(odsframe, "A.id=B.id").whenMatched("A.state <> B.state").updateExpr(updateMap).execute()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -188,7 +194,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
-
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select price from order where where state = 2"), Seq(Row(22500), Row(30000)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -203,7 +209,6 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
"quantity" -> "B.quantity",
"price" -> "B.price",
"state" -> "B.state").asInstanceOf[Map[Any, Any]]
-
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -219,9 +224,9 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
"quantity" -> "B.quantity",
"price" -> "B.price",
"state" -> "B.state").asInstanceOf[Map[Any, Any]]
-
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -238,6 +243,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched(
col("A.state") =!= col("B.state")).updateExpr(updateMap).execute()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -249,7 +255,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
col("state") -> col("B.state")).asInstanceOf[Map[Any, Any]]
dwSelframe.merge(odsframe, col("A.id").equalTo(col("B.id"))).whenMatched().updateExpr(updateMap).execute()
-
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -294,6 +300,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -320,7 +327,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
- sql("select * from order").show()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -348,6 +355,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order"), Seq(Row(12)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -364,6 +372,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
}
@@ -382,6 +391,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order"), Seq(Row(8)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
}
@@ -409,6 +419,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where id like 'newid%'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
@@ -454,7 +465,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
delete().
insertExpr(insertMap_d).
execute()
- sql("select * from order").show()
+ assert(getDeleteDeltaFileCount("order", "0") == 1)
checkAnswer(sql("select count(*) from order where c_name = 'delete'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order where c_name = 'insert'"), Seq(Row(2)))
checkAnswer(sql("select count(*) from order"), Seq(Row(14)))
@@ -501,6 +512,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
CarbonMergeDataSetCommand(dwSelframe,
odsframe,
MergeDataSetMatches(col("A.id").equalTo(col("B.id")), matches.toList)).run(sqlContext.sparkSession)
+ assert(getDeleteDeltaFileCount("order", "0") == 3)
checkAnswer(sql("select count(*) from order"), Seq(Row(10)))
checkAnswer(sql("select count(*) from order where state = 2"), Seq(Row(2)))
checkAnswer(sql("select price from order where id = 'newid1'"), Seq(Row(7500)))
@@ -603,6 +615,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
insertExpr(insertMap).
execute()
+ assert(getDeleteDeltaFileCount("customers", "0") == 1)
checkAnswer(sql("select count(*) from customers"), Seq(Row(6)))
checkAnswer(sql("select count(*) from customers where current='true'"), Seq(Row(4)))
checkAnswer(sql("select count(*) from customers where effectivedate is not null and enddate is not null"), Seq(Row(1)))
@@ -654,6 +667,7 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
insertExpr(insertMap).
whenMatched("B.deleted=true").
delete().execute()
+ assert(getDeleteDeltaFileCount("target", "0") == 0)
checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"), Row("e", "100")))
}
@@ -702,10 +716,22 @@ class MergeTestCase extends QueryTest with BeforeAndAfterAll {
insertExpr(insertMap).
whenMatched("B.deleted=true").
delete().execute()
+ assert(getDeleteDeltaFileCount("target", "0") == 1)
checkAnswer(sql("select count(*) from target"), Seq(Row(3)))
checkAnswer(sql("select * from target order by key"), Seq(Row("c", "200"), Row("d", "3"), Row("e", "100")))
}
+ private def getDeleteDeltaFileCount(tableName: String, segment: String): Int = {
+ val table = CarbonEnv.getCarbonTable(None, tableName)(sqlContext.sparkSession)
+ val path = CarbonTablePath
+ .getSegmentPath(table.getAbsoluteTableIdentifier.getTablePath, segment)
+ val deleteDeltaFiles = FileFactory.getCarbonFile(path).listFiles(true, new CarbonFileFilter {
+ override def accept(file: CarbonFile): Boolean = file.getName.endsWith(CarbonCommonConstants
+ .DELETE_DELTA_FILE_EXT)
+ })
+ deleteDeltaFiles.size()
+ }
+
override def afterAll {
sql("drop table if exists order")
}