You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:45 UTC

[hudi] 07/17: [HUDI-5492] spark call command 'show_compaction' doesn't return the completed compaction (#7593)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 83a0cab485d0bc257b203a60f6a00fea0f1556ed
Author: wangkang <52...@qq.com>
AuthorDate: Thu Jan 5 09:43:45 2023 +0800

    [HUDI-5492] spark call command 'show_compaction' doesn't return the completed compaction (#7593)
    
    Co-authored-by: kandy01.wang <ka...@vipshop.com>
    
    (cherry picked from commit 2f5e487445a19e2fa5b2c26143efeb4fe9aeda63)
---
 .../procedures/ShowCompactionProcedure.scala       |  2 +-
 .../spark/sql/hudi/TestCompactionTable.scala       |  6 ++--
 .../hudi/procedure/TestCompactionProcedure.scala   | 38 +++++++++++++++++++---
 3 files changed, 38 insertions(+), 8 deletions(-)

diff --git a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
index 7a7bb2cf9d9..1076b9fc44a 100644
--- a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/ShowCompactionProcedure.scala
@@ -65,7 +65,7 @@ class ShowCompactionProcedure extends BaseProcedure with ProcedureBuilder with S
     assert(metaClient.getTableType == HoodieTableType.MERGE_ON_READ,
       s"Cannot show compaction on a Non Merge On Read table.")
     val compactionInstants = metaClient.getActiveTimeline.getInstants.iterator().asScala
-      .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION)
+      .filter(p => p.getAction == HoodieTimeline.COMPACTION_ACTION || p.getAction == HoodieTimeline.COMMIT_ACTION)
       .toSeq
       .sortBy(f => f.getTimestamp)
       .reverse
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
index 0ef89fc5b9f..78fa33c52ca 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCompactionTable.scala
@@ -58,7 +58,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
         Seq(3, "a3", 10.0, 1000),
         Seq(4, "a4", 10.0, 1000)
       )
-      assertResult(1)(spark.sql(s"show compaction on $tableName").collect().length)
+      assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length)
       spark.sql(s"run compaction on $tableName at ${timestamps(0)}")
       checkAnswer(s"select id, name, price, ts from $tableName order by id")(
         Seq(1, "a1", 11.0, 1000),
@@ -66,7 +66,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
         Seq(3, "a3", 10.0, 1000),
         Seq(4, "a4", 10.0, 1000)
       )
-      assertResult(0)(spark.sql(s"show compaction on $tableName").collect().length)
+      assertResult(2)(spark.sql(s"show compaction on $tableName").collect().length)
     }
   }
 
@@ -119,7 +119,7 @@ class TestCompactionTable extends HoodieSparkSqlTestBase {
         Seq(2, "a2", 12.0, 1000),
         Seq(3, "a3", 10.0, 1000)
       )
-      assertResult(0)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
+      assertResult(2)(spark.sql(s"show compaction on '${tmp.getCanonicalPath}'").collect().length)
 
       checkException(s"run compaction on '${tmp.getCanonicalPath}' at 12345")(
         s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are:  "
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
index e9d9d550d3f..236d87970d2 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestCompactionProcedure.scala
@@ -88,8 +88,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
       val resultC = spark.sql(s"call show_compaction('$tableName')")
         .collect()
         .map(row => Seq(row.getString(0), row.getInt(1), row.getString(2)))
-      assertResult(1)(resultC.length)
-      assertResult(resultA)(resultC)
+      assertResult(2)(resultC.length)
 
       checkAnswer(s"call run_compaction(op => 'run', table => '$tableName', timestamp => ${timestamps(0)})")(
         Seq(resultA(0).head, resultA(0)(1), HoodieInstant.State.COMPLETED.name())
@@ -100,7 +99,7 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
         Seq(3, "a3", 10.0, 1000),
         Seq(4, "a4", 10.0, 1000)
       )
-      assertResult(0)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length)
+      assertResult(2)(spark.sql(s"call show_compaction(table => '$tableName')").collect().length)
     }
   }
 
@@ -168,11 +167,42 @@ class TestCompactionProcedure extends HoodieSparkProcedureTestBase {
         Seq(2, "a2", 12.0, 1000),
         Seq(3, "a3", 10.0, 1000)
       )
-      assertResult(0)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
+      assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}')").collect().length)
 
       checkException(s"call run_compaction(op => 'run', path => '${tmp.getCanonicalPath}', timestamp => 12345L)")(
         s"Compaction instant: 12345 is not found in ${tmp.getCanonicalPath}, Available pending compaction instants are:  "
       )
     }
   }
+  test("Test show_compaction Procedure by Path") {
+    withTempDir { tmp =>
+      val tableName1 = generateTableName
+      spark.sql(
+        s"""
+           |create table $tableName1 (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | tblproperties (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts',
+           |  hoodie.compact.inline ='true',
+           |  hoodie.compact.inline.max.delta.commits ='2'
+           | )
+           | location '${tmp.getCanonicalPath}/$tableName1'
+       """.stripMargin)
+      spark.sql(s"insert into $tableName1 values(1, 'a1', 10, 1000)")
+
+      spark.sql(s"insert into $tableName1 values(1, 'a2', 10, 1000)")
+
+      spark.sql(s"insert into $tableName1 values(1, 'a3', 10, 1000)")
+
+      spark.sql(s"insert into $tableName1 values(1, 'a4', 10, 1000)")
+
+      assertResult(2)(spark.sql(s"call show_compaction(path => '${tmp.getCanonicalPath}/$tableName1')").collect().length)
+    }
+  }
 }