You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/05 02:17:51 UTC
[hudi] 05/06: [HUDI-5701] Remove meta fields from cdc new record in CDCLogger (#7852)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit f5cfec43270a98d9a10512e9a651fb35d4e63e76
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Sun Feb 5 08:05:13 2023 +0800
[HUDI-5701] Remove meta fields from cdc new record in CDCLogger (#7852)
---
.../java/org/apache/hudi/io/HoodieCDCLogger.java | 2 +-
.../apache/spark/sql/hudi/TestCDCForSparkSQL.scala | 20 ++++++++++++++------
2 files changed, 15 insertions(+), 7 deletions(-)
diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
index fd2dc60b58b..096bf475667 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieCDCLogger.java
@@ -245,7 +245,7 @@ public class HoodieCDCLogger implements Closeable {
private CDCTransformer getTransformer() {
if (cdcSupplementalLoggingMode == data_before_after) {
return (operation, recordKey, oldRecord, newRecord) ->
- HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), newRecord);
+ HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), commitTime, removeCommitMetadata(oldRecord), removeCommitMetadata(newRecord));
} else if (cdcSupplementalLoggingMode == data_before) {
return (operation, recordKey, oldRecord, newRecord) ->
HoodieCDCUtils.cdcRecord(cdcSchema, operation.getValue(), recordKey, removeCommitMetadata(oldRecord));
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
index bec2230e5ab..a27da08866a 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/TestCDCForSparkSQL.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.hudi
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, op_key_only}
+import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.{data_before, data_before_after, op_key_only}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.junit.jupiter.api.Assertions.assertEquals
@@ -53,10 +53,15 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"use $databaseName")
Seq("cow", "mor").foreach { tableType =>
- Seq(op_key_only, data_before).foreach { loggingMode =>
+ Seq(op_key_only, data_before, data_before_after).foreach { loggingMode =>
withTempDir { tmp =>
val tableName = generateTableName
val basePath = s"${tmp.getCanonicalPath}/$tableName"
+ val otherTableProperties = if (tableType == "mor") {
+ "'hoodie.compact.inline'='true', 'hoodie.compact.inline.max.delta.commits'='2',"
+ } else {
+ ""
+ }
spark.sql(
s"""
| create table $tableName (
@@ -70,6 +75,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| 'preCombineField' = 'ts',
| 'hoodie.table.cdc.enabled' = 'true',
| 'hoodie.table.cdc.supplemental.logging.mode' = '${loggingMode.name()}',
+ | $otherTableProperties
| type = '$tableType'
| )
| location '$basePath'
@@ -88,7 +94,9 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"insert into $tableName values (1, 'a1_v2', 11, 1100)")
val commitTime2 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
- val cdcDataOnly2 = cdcDataFrame(basePath, commitTime2.toLong - 1)
+ // here we use `commitTime1` to query the change data in commit 2.
+ // because `commitTime2` is maybe the ts of the compaction operation, not the write operation.
+ val cdcDataOnly2 = cdcDataFrame(basePath, commitTime1.toLong)
cdcDataOnly2.show(false)
assertCDCOpCnt(cdcDataOnly2, 0, 1, 0)
@@ -110,13 +118,13 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
spark.sql(s"update $tableName set name = 'a2_v2', ts = 1200 where id = 2")
val commitTime3 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
- val cdcDataOnly3 = cdcDataFrame(basePath, commitTime3.toLong - 1)
+ val cdcDataOnly3 = cdcDataFrame(basePath, commitTime2.toLong)
cdcDataOnly3.show(false)
assertCDCOpCnt(cdcDataOnly3, 0, 1, 0)
spark.sql(s"delete from $tableName where id = 3")
val commitTime4 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
- val cdcDataOnly4 = cdcDataFrame(basePath, commitTime4.toLong - 1)
+ val cdcDataOnly4 = cdcDataFrame(basePath, commitTime3.toLong)
cdcDataOnly4.show(false)
assertCDCOpCnt(cdcDataOnly4, 0, 0, 1)
@@ -135,7 +143,7 @@ class TestCDCForSparkSQL extends HoodieSparkSqlTestBase {
| when not matched then insert *
""".stripMargin)
val commitTime5 = metaClient.reloadActiveTimeline.lastInstant().get().getTimestamp
- val cdcDataOnly5 = cdcDataFrame(basePath, commitTime5.toLong - 1)
+ val cdcDataOnly5 = cdcDataFrame(basePath, commitTime4.toLong)
cdcDataOnly5.show(false)
assertCDCOpCnt(cdcDataOnly5, 1, 1, 0)