You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/05 02:17:51 UTC

[hudi] 05/06: [HUDI-5701] Remove meta fields from cdc new record in CDCLogger (#7852)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit f5cfec43270a98d9a10512e9a651fb35d4e63e76
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Sun Feb 5 08:05:13 2023 +0800

    [HUDI-5701] Remove meta fields from cdc new record in CDCLogger (#7852)
---
 .../java/org/apache/hudi/io/HoodieCDCLogger.java     |  2 +-
 .../apache/spark/sql/hudi/TestCDCForSparkSQL.scala   | 20 ++++++++++++++------
 2 files changed, 15 insertions(+), 7 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index fd2dc60b58b..096bf475667 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -245,7 +245,7 @@ public class HoodieCDCLogger implements Closeable {
   private CDCTransformer getTransformer() {
     if (cdcSupplementalLoggingMode == data_before_after) {
       return (operation, recordKey, oldRecord, newRecord) ->
-          HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
+          HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), removeCommitMetadata(newRecord));
     } else if (cdcSupplementalLoggingMode == data_before) {
       return (operation, recordKey, oldRecord, newRecord) ->
           HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index bec2230e5ab..a27da08866a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
 
 import org.apache.hudi.DataSourceReadOptions._
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, data_before_after, op_key_only}
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions._
 import org.junit.jupiter.api.Assertions.assertEquals
@@ -53,10 +53,15 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
     spark.sql(s"use $databaseName")
 
     Seq("cow", "mor").foreach { tableType =>
-      Seq(op_key_only, data_before).foreach { loggingMode =>
+      Seq(op_key_only, data_before, data_before_after).foreach { loggingMode =>
         withTempDir { tmp =>
           val tableName = generateTableName
           val basePath = s"${tmp.getCanonicalPath}/$tableName"
+          val otherTableProperties = if (tableType == "mor") {
+            "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2',"
+          } else {
+            ""
+          }
           spark.sql(
             s"""
                | create table $tableName (
@@ -70,6 +75,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                |   'preCombineField' = 'ts',
                |   'hoodie.table.cdc.enabled' = 'true',
                |   'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
+               |   $otherTableProperties
                |   type = '$tableType'
                | )
                | location '$basePath'
@@ -88,7 +94,9 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)")
           val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
-          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+          // here we use `commitTime1` to query the change data in commit 2.
+          // because `commitTime2` is maybe the ts of the compaction operation, not the write operation.
+          val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
           cdcDataOnly2.show(false)
           assertCDCOpCnt(cdcDataOnly2, 0, 1, 0)
 
@@ -110,13 +118,13 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
 
           spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2")
           val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
-          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+          val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong)
           cdcDataOnly3.show(false)
           assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
 
           spark.sql(s"delete from $tableName where id = 3")
           val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
-          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+          val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong)
           cdcDataOnly4.show(false)
           assertCDCOpCnt(cdcDataOnly4, 0, 0, 1)
 
@@ -135,7 +143,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
                | when not matched then insert *
         """.stripMargin)
           val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
-          val cdcDataOnly5 = cdcDataFrame(basePath, commitTime5.toLong - 1)
+          val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong)
           cdcDataOnly5.show(false)
           assertCDCOpCnt(cdcDataOnly5, 1, 1, 0)