You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:45 UTC
[hudi] 07/17: [HUDI-5492] spark call command 'show_compaction' doesn't return the completed compaction (#7593)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 83a0cab485d0bc257b203a60f6a00fea0f1556ed
Author: wangkang <52...@qq.com>
AuthorDate: Thu Jan 5 09:43:45 2023 +0800
[HUDI-5492] spark call command 'show_compaction' doesn't return the completed compaction (#7593)
Co-authored-by: kandy01.wang <ka...@vipshop.com>
(cherry picked from commit 2f5e487445a19e2fa5b2c26143efeb4fe9aeda63)
---
.../procedures/ShowCompactionProcedure.scala | 2 +-
.../spark/sql/hudi/TestCompactionTable.scala | 6 ++--
.../hudi/procedure/TestCompactionProcedure.scala | 38 +++++++++++++++++++---
3 files changed, 38 insertions(+), 8 deletions(-)
diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index 7a7bb2cf9d9..1076b9fc44a 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -65,7 +65,7 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
s"Cannot show compaction on a Non Merge On Read table.")
val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
- .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+ .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION || p.getAction == HoodieTimeline.COMMIT_ACTION)
.toSeq
.sortBy(f => f.getTimestamp)
.reverse
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index 0ef89fc5b9f..78fa33c52ca 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -58,7 +58,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(1)(spark.sql(s"show compaction on $tableName").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length)
spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
checkAnswer(s"select id, name, price, ts from $tableName order by id")(
Seq(1, "a1", 11.0, 1000),
@@ -66,7 +66,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"show compaction on $tableName").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length)
}
}
@@ -119,7 +119,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
+ assertResult(2)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: "
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index e9d9d550d3f..236d87970d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -88,8 +88,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
val resultC = spark.sql(s"call show_compaction('$tableName')")
.collect()
.map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
- assertResult(1)(resultC.length)
- assertResult(resultA)(resultC)
+ assertResult(2)(resultC.length)
checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")(
Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name())
@@ -100,7 +99,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
Seq(3, "a3", 10.0, 1000),
Seq(4, "a4", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length)
+ assertResult(2)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length)
}
}
@@ -168,11 +167,42 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
Seq(2, "a2", 12.0, 1000),
Seq(3, "a3", 10.0, 1000)
)
- assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
+ assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
checkException(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}', timestamp => 12345L)")(
s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are: "
)
}
}
+ test("Test show_compaction Procedure by Path") {
+ withTempDir { tmp =>
+ val tableName1 = generateTableName
+ spark.sql(
+ s"""
+ |create table $tableName1 (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | tblproperties (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts',
+ | hoodie.compact.inline ='true',
+ | hoodie.compact.inline.max.delta.commits ='2'
+ | )
+ | location '${tmp.getCanonicalPath}/$tableName1'
+ """.stripMargin)
+ spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
+
+ spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
+
+ assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}/$tableName1')").collect().length)
+ }
+ }
}