You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/01/30 17:34:51 UTC

[hudi] branch release-0.13.0 updated (5765e1b9b96 -> 8ffc7ce3d8f)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a change to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git


    from 5765e1b9b96 [MINOR] Cleaning up recently introduced configs (#7772)
     new 399549d7bb2 [HUDI-5629] Clean CDC log files for enable/disable scenario (#7786)
     new cbff2bc0c95 [HUDI-5637] Add Kryo for hive sync bundle (#7781)
     new b81f49eb062 [HUDI-5634] Rename CDC related classes (#7410)
     new 8ffc7ce3d8f [HUDI-5632] Fix failure launching Spark jobs from hudi-cli-bundle (#7790)

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../java/org/apache/hudi/cli/utils/SparkUtil.java  | 12 ++++-
 .../hudi/table/action/clean/CleanPlanner.java      | 12 ++---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  | 10 ++--
 .../hudi/common/table/cdc/HoodieCDCFileSplit.java  | 22 ++++----
 ...CInferCase.java => HoodieCDCInferenceCase.java} |  4 +-
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 63 +++++++++++-----------
 packaging/hudi-hive-sync-bundle/pom.xml            | 29 ++++++++++
 rfc/rfc-51/rfc-51.md                               |  2 +-
 8 files changed, 92 insertions(+), 62 deletions(-)
 rename hudi-common/src/main/java/org/apache/hudi/common/table/cdc/{HoodieCDCInferCase.java => HoodieCDCInferenceCase.java} (98%)


[hudi] 04/04: [HUDI-5632] Fix failure launching Spark jobs from hudi-cli-bundle (#7790)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 8ffc7ce3d8fe352490c376583c518c837343a71b
Author: Y Ethan Guo <et...@gmail.com>
AuthorDate: Mon Jan 30 07:54:40 2023 -0800

    [HUDI-5632] Fix failure launching Spark jobs from hudi-cli-bundle (#7790)
    
    - Ensures that Hudi CLI commands which require launching Spark can be executed with hudi-cli-bundle
---
 .../src/main/java/org/apache/hudi/cli/utils/SparkUtil.java   | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index 1cffdf941f9..fd09a27271a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -57,10 +57,18 @@ public class SparkUtil {
     }
 
     File libDirectory = new File(new File(currentJar).getParent(), "lib");
-    // This lib directory may be not required, such as providing libraries through a bundle jar
     if (libDirectory.exists()) {
+      // When directly using hudi-cli module, the jars under the lib directory
+      // generated by the compilation is required
       Arrays.stream(libDirectory.list()).forEach(library ->
-              sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()));
+          sparkLauncher.addJar(new File(libDirectory, library).getAbsolutePath()));
+    } else {
+      // When using hudi-cli-bundle, we also need to add the hudi-spark*-bundle
+      // so that the Hudi Spark job can be launched
+      String sparkBundleJarPath = System.getenv("SPARK_BUNDLE_JAR");
+      if (!StringUtils.isNullOrEmpty(sparkBundleJarPath)) {
+        sparkLauncher.addJar(sparkBundleJarPath);
+      }
     }
     return sparkLauncher;
   }


[hudi] 03/04: [HUDI-5634] Rename CDC related classes (#7410)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit b81f49eb062534da3ebb447f13a0482bbb1cd470
Author: Yann Byron <bi...@gmail.com>
AuthorDate: Mon Jan 30 16:15:24 2023 +0800

    [HUDI-5634] Rename CDC related classes (#7410)
---
 .../hudi/common/table/cdc/HoodieCDCExtractor.java  | 10 ++--
 .../hudi/common/table/cdc/HoodieCDCFileSplit.java  | 22 ++++----
 ...CInferCase.java => HoodieCDCInferenceCase.java} |  4 +-
 .../scala/org/apache/hudi/cdc/HoodieCDCRDD.scala   | 63 +++++++++++-----------
 rfc/rfc-51/rfc-51.md                               |  2 +-
 5 files changed, 49 insertions(+), 52 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
index 506680dc3b2..6ca116015b0 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCExtractor.java
@@ -56,11 +56,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.AS_IS;
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_DELETE;
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.BASE_FILE_INSERT;
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.LOG_FILE;
-import static org.apache.hudi.common.table.cdc.HoodieCDCInferCase.REPLACE_COMMIT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.AS_IS;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_DELETE;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.BASE_FILE_INSERT;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.LOG_FILE;
+import static org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase.REPLACE_COMMIT;
 import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.data_before_after;
 import static org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode.op_key_only;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
index d508f7ac4ea..f992a9b228c 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCFileSplit.java
@@ -33,16 +33,16 @@ import java.util.stream.Collectors;
  * This contains all the information that retrieve the change data at a single file group and
  * at a single commit.
  * <p>
- * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_INSERT}, `cdcFile` is a current version of
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_INSERT}, `cdcFile` is a current version of
  * the base file in the group, and `beforeFileSlice` is None.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#BASE_FILE_DELETE}, `cdcFile` is null,
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#BASE_FILE_DELETE}, `cdcFile` is null,
  * `beforeFileSlice` is the previous version of the base file in the group.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#AS_IS}, `cdcFile` is a log file with cdc blocks.
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#AS_IS}, `cdcFile` is a log file with cdc blocks.
  * when enable the supplemental logging, both `beforeFileSlice` and `afterFileSlice` are None,
  * otherwise these two are the previous and current version of the base file.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#LOG_FILE}, `cdcFile` is a normal log file and
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#LOG_FILE}, `cdcFile` is a normal log file and
  * `beforeFileSlice` is the previous version of the file slice.
- * For `cdcInferCase` = {@link HoodieCDCInferCase#REPLACE_COMMIT}, `cdcFile` is null,
+ * For `cdcInferCase` = {@link HoodieCDCInferenceCase#REPLACE_COMMIT}, `cdcFile` is null,
  * `beforeFileSlice` is the current version of the file slice.
  */
 public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFileSplit> {
@@ -54,7 +54,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil
   /**
    * Flag that decides to how to retrieve the change data. More details see: `HoodieCDCLogicalFileType`.
    */
-  private final HoodieCDCInferCase cdcInferCase;
+  private final HoodieCDCInferenceCase cdcInferCase;
 
   /**
    * The file that the change data can be parsed from.
@@ -71,17 +71,17 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil
    */
   private final Option<FileSlice> afterFileSlice;
 
-  public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, String cdcFile) {
+  public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase cdcInferCase, String cdcFile) {
     this(instant, cdcInferCase, cdcFile, Option.empty(), Option.empty());
   }
 
-  public HoodieCDCFileSplit(String instant, HoodieCDCInferCase cdcInferCase, Collection<String> cdcFiles) {
+  public HoodieCDCFileSplit(String instant, HoodieCDCInferenceCase cdcInferCase, Collection<String> cdcFiles) {
     this(instant, cdcInferCase, cdcFiles, Option.empty(), Option.empty());
   }
 
   public HoodieCDCFileSplit(
       String instant,
-      HoodieCDCInferCase cdcInferCase,
+      HoodieCDCInferenceCase cdcInferCase,
       String cdcFile,
       Option<FileSlice> beforeFileSlice,
       Option<FileSlice> afterFileSlice) {
@@ -90,7 +90,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil
 
   public HoodieCDCFileSplit(
       String instant,
-      HoodieCDCInferCase cdcInferCase,
+      HoodieCDCInferenceCase cdcInferCase,
       Collection<String> cdcFiles,
       Option<FileSlice> beforeFileSlice,
       Option<FileSlice> afterFileSlice) {
@@ -106,7 +106,7 @@ public class HoodieCDCFileSplit implements Serializable, Comparable<HoodieCDCFil
     return this.instant;
   }
 
-  public HoodieCDCInferCase getCdcInferCase() {
+  public HoodieCDCInferenceCase getCdcInferCase() {
     return this.cdcInferCase;
   }
 
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
similarity index 98%
rename from hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
rename to hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
index dfcb08a84cd..9f6d85108c1 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferCase.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/cdc/HoodieCDCInferenceCase.java
@@ -42,7 +42,7 @@ package org.apache.hudi.common.table.cdc;
  *   file is new-coming, so we can load this, mark all the records with `i`, and treat them as
  *   the value of `after`. The value of `before` for each record is null.
  *
- * BASE_FILE_INSERT:
+ * BASE_FILE_DELETE:
  *   For this type, there must be an empty file at the current instant, but a non-empty base file
  *   at the previous instant. First we find this base file that has the same file group and belongs
  *   to the previous instant. Then load this, mark all the records with `d`, and treat them as
@@ -67,7 +67,7 @@ package org.apache.hudi.common.table.cdc;
  *   a whole file group. First we find this file group. Then load this, mark all the records with
  *   `d`, and treat them as the value of `before`. The value of `after` for each record is null.
  */
-public enum HoodieCDCInferCase {
+public enum HoodieCDCInferenceCase {
 
   AS_IS,
   BASE_FILE_INSERT,
diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
index 1e4bf0098a1..29f477a84d4 100644
--- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
+++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/cdc/HoodieCDCRDD.scala
@@ -32,7 +32,7 @@ import org.apache.hudi.avro.HoodieAvroUtils
 import org.apache.hudi.common.config.{HoodieMetadataConfig, TypedProperties}
 import org.apache.hudi.common.model._
 import org.apache.hudi.common.table.HoodieTableMetaClient
-import org.apache.hudi.common.table.cdc.HoodieCDCInferCase._
+import org.apache.hudi.common.table.cdc.HoodieCDCInferenceCase._
 import org.apache.hudi.common.table.cdc.HoodieCDCOperation._
 import org.apache.hudi.common.table.cdc.HoodieCDCSupplementalLoggingMode._
 import org.apache.hudi.common.table.cdc.{HoodieCDCFileSplit, HoodieCDCUtils}
@@ -81,10 +81,11 @@ class HoodieCDCRDD(
     originTableSchema: HoodieTableSchema,
     cdcSchema: StructType,
     requiredCdcSchema: StructType,
-    changes: Array[HoodieCDCFileGroupSplit])
+    @transient changes: Array[HoodieCDCFileGroupSplit])
   extends RDD[InternalRow](spark.sparkContext, Nil) with HoodieUnsafeRDD {
 
-  @transient private val hadoopConf = spark.sparkContext.hadoopConfiguration
+  @transient
+  private val hadoopConf = spark.sparkContext.hadoopConfiguration
 
   private val confBroadcast = spark.sparkContext.broadcast(new SerializableWritable(hadoopConf))
 
@@ -118,7 +119,7 @@ class HoodieCDCRDD(
 
     private lazy val fs = metaClient.getFs.getFileSystem
 
-    private lazy val conf = new Configuration(confBroadcast.value.value)
+    private lazy val conf = confBroadcast.value.value
 
     private lazy val basePath = metaClient.getBasePathV2
 
@@ -127,11 +128,7 @@ class HoodieCDCRDD(
     private lazy val populateMetaFields = tableConfig.populateMetaFields()
 
     private lazy val keyGenerator = {
-      val props = new TypedProperties()
-      props.put(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key, tableConfig.getKeyGeneratorClassName)
-      props.put(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key, tableConfig.getRecordKeyFieldProp)
-      props.put(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key, tableConfig.getPartitionFieldProp)
-      HoodieSparkKeyGeneratorFactory.createKeyGenerator(props)
+      HoodieSparkKeyGeneratorFactory.createKeyGenerator(tableConfig.getProps())
     }
 
     private lazy val recordKeyField: String = if (populateMetaFields) {
@@ -202,7 +199,7 @@ class HoodieCDCRDD(
     private var currentInstant: String = _
 
     // The change file that is currently being processed
-    private var currentChangeFile: HoodieCDCFileSplit = _
+    private var currentCDCFileSplit: HoodieCDCFileSplit = _
 
     /**
      * Two cases will use this to iterator the records:
@@ -258,10 +255,10 @@ class HoodieCDCRDD(
       if (needLoadNextFile) {
         loadCdcFile()
       }
-      if (currentChangeFile == null) {
+      if (currentCDCFileSplit == null) {
         false
       } else {
-        currentChangeFile.getCdcInferCase match {
+        currentCDCFileSplit.getCdcInferCase match {
           case BASE_FILE_INSERT | BASE_FILE_DELETE | REPLACE_COMMIT =>
             if (recordIter.hasNext && loadNext()) {
               true
@@ -292,7 +289,7 @@ class HoodieCDCRDD(
 
     def loadNext(): Boolean = {
       var loaded = false
-      currentChangeFile.getCdcInferCase match {
+      currentCDCFileSplit.getCdcInferCase match {
         case BASE_FILE_INSERT =>
           val originRecord = recordIter.next()
           recordToLoad.update(3, convertRowToJsonString(originRecord))
@@ -416,34 +413,34 @@ class HoodieCDCRDD(
       if (cdcFileIter.hasNext) {
         val split = cdcFileIter.next()
         currentInstant = split.getInstant
-        currentChangeFile = split
-        currentChangeFile.getCdcInferCase match {
+        currentCDCFileSplit = split
+        currentCDCFileSplit.getCdcInferCase match {
           case BASE_FILE_INSERT =>
-            assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1)
-            val absCDCPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0))
+            assert(currentCDCFileSplit.getCdcFiles != null && currentCDCFileSplit.getCdcFiles.size() == 1)
+            val absCDCPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0))
             val fileStatus = fs.getFileStatus(absCDCPath)
             val pf = PartitionedFile(InternalRow.empty, absCDCPath.toUri.toString, 0, fileStatus.getLen)
             recordIter = parquetReader(pf)
           case BASE_FILE_DELETE =>
-            assert(currentChangeFile.getBeforeFileSlice.isPresent)
-            recordIter = loadFileSlice(currentChangeFile.getBeforeFileSlice.get)
+            assert(currentCDCFileSplit.getBeforeFileSlice.isPresent)
+            recordIter = loadFileSlice(currentCDCFileSplit.getBeforeFileSlice.get)
           case LOG_FILE =>
-            assert(currentChangeFile.getCdcFiles != null && currentChangeFile.getCdcFiles.size() == 1
-              && currentChangeFile.getBeforeFileSlice.isPresent)
-            loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
-            val absLogPath = new Path(basePath, currentChangeFile.getCdcFiles.get(0))
+            assert(currentCDCFileSplit.getCdcFiles != null && currentCDCFileSplit.getCdcFiles.size() == 1
+              && currentCDCFileSplit.getBeforeFileSlice.isPresent)
+            loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
+            val absLogPath = new Path(basePath, currentCDCFileSplit.getCdcFiles.get(0))
             val morSplit = HoodieMergeOnReadFileSplit(None, List(new HoodieLogFile(fs.getFileStatus(absLogPath))))
             val logFileIterator = new LogFileIterator(morSplit, originTableSchema, originTableSchema, tableState, conf)
             logRecordIter = logFileIterator.logRecordsPairIterator
           case AS_IS =>
-            assert(currentChangeFile.getCdcFiles != null && !currentChangeFile.getCdcFiles.isEmpty)
+            assert(currentCDCFileSplit.getCdcFiles != null && !currentCDCFileSplit.getCdcFiles.isEmpty)
             // load beforeFileSlice to beforeImageRecords
-            if (currentChangeFile.getBeforeFileSlice.isPresent) {
-              loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
+            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+              loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
             }
             // load afterFileSlice to afterImageRecords
-            if (currentChangeFile.getAfterFileSlice.isPresent) {
-              val iter = loadFileSlice(currentChangeFile.getAfterFileSlice.get())
+            if (currentCDCFileSplit.getAfterFileSlice.isPresent) {
+              val iter = loadFileSlice(currentCDCFileSplit.getAfterFileSlice.get())
               afterImageRecords = mutable.Map.empty
               iter.foreach { row =>
                 val key = getRecordKey(row)
@@ -451,13 +448,13 @@ class HoodieCDCRDD(
               }
             }
 
-            val cdcLogFiles = currentChangeFile.getCdcFiles.asScala.map { cdcFile =>
+            val cdcLogFiles = currentCDCFileSplit.getCdcFiles.asScala.map { cdcFile =>
               new HoodieLogFile(fs.getFileStatus(new Path(basePath, cdcFile)))
             }.toArray
             cdcLogRecordIterator = new HoodieCDCLogRecordIterator(fs, cdcLogFiles, cdcAvroSchema)
           case REPLACE_COMMIT =>
-            if (currentChangeFile.getBeforeFileSlice.isPresent) {
-              loadBeforeFileSliceIfNeeded(currentChangeFile.getBeforeFileSlice.get)
+            if (currentCDCFileSplit.getBeforeFileSlice.isPresent) {
+              loadBeforeFileSliceIfNeeded(currentCDCFileSplit.getBeforeFileSlice.get)
             }
             recordIter = beforeImageRecords.values.map { record =>
               deserialize(record)
@@ -467,7 +464,7 @@ class HoodieCDCRDD(
         resetRecordFormat()
       } else {
         currentInstant = null
-        currentChangeFile = null
+        currentCDCFileSplit = null
       }
     }
 
@@ -475,7 +472,7 @@ class HoodieCDCRDD(
      * Initialize the partial fields of the data to be returned in advance to speed up.
      */
     private def resetRecordFormat(): Unit = {
-      recordToLoad = currentChangeFile.getCdcInferCase match {
+      recordToLoad = currentCDCFileSplit.getCdcInferCase match {
         case BASE_FILE_INSERT =>
           InternalRow.fromSeq(Array(
             CDCRelation.CDC_OPERATION_INSERT, convertToUTF8String(currentInstant),
diff --git a/rfc/rfc-51/rfc-51.md b/rfc/rfc-51/rfc-51.md
index 18b76116c86..29115b46344 100644
--- a/rfc/rfc-51/rfc-51.md
+++ b/rfc/rfc-51/rfc-51.md
@@ -202,7 +202,7 @@ tblproperties (
 
 ### How to infer CDC results
 
-| `HoodieCDCInferCase` | Infer case details                                                                                                        | Infer logic                                                                                                                                                               | Note                               |
+| `HoodieCDCInferenceCase` | Infer case details                                                                                                        | Infer logic                                                                                                                                                               | Note                               |
 |----------------------|---------------------------------------------------------------------------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------------------------------------|
 | `AS_IS`              | CDC file written (suffix contains `-cdc`) alongside base files (COW) or log files (MOR)                                   | CDC info will be extracted as is                                                                                                                                          | the read-optimized way to read CDC |
 | `BASE_FILE_INSERT`   | Base files were written to a new file group                                                                               | All records (in the current commit): `op=I`, `before=null`, `after=<current value>`                                                                                       | on-the-fly inference               |


[hudi] 02/04: [HUDI-5637] Add Kryo for hive sync bundle (#7781)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit cbff2bc0c9555f5ca1062d9a86c527423be8eaa4
Author: Sagar Sumit <sa...@gmail.com>
AuthorDate: Mon Jan 30 12:04:13 2023 +0530

    [HUDI-5637] Add Kryo for hive sync bundle (#7781)
---
 packaging/hudi-hive-sync-bundle/pom.xml | 29 +++++++++++++++++++++++++++++
 1 file changed, 29 insertions(+)

diff --git a/packaging/hudi-hive-sync-bundle/pom.xml b/packaging/hudi-hive-sync-bundle/pom.xml
index 56d8aed0bb5..9b8a2cd20fc 100644
--- a/packaging/hudi-hive-sync-bundle/pom.xml
+++ b/packaging/hudi-hive-sync-bundle/pom.xml
@@ -76,6 +76,10 @@
                   <include>org.apache.parquet:parquet-avro</include>
                   <include>commons-io:commons-io</include>
                   <include>org.openjdk.jol:jol-core</include>
+                  <!-- Kryo -->
+                  <include>com.esotericsoftware:kryo-shaded</include>
+                  <include>com.esotericsoftware:minlog</include>
+                  <include>org.objenesis:objenesis</include>
                 </includes>
               </artifactSet>
               <relocations combine.children="append">
@@ -87,6 +91,23 @@
                   <pattern>org.openjdk.jol.</pattern>
                   <shadedPattern>org.apache.hudi.org.openjdk.jol.</shadedPattern>
                 </relocation>
+                <!-- Kryo -->
+                <relocation>
+                  <pattern>com.esotericsoftware.kryo.</pattern>
+                  <shadedPattern>org.apache.hudi.com.esotericsoftware.kryo.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.reflectasm.</pattern>
+                  <shadedPattern>org.apache.hudi.com.esotericsoftware.reflectasm.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>com.esotericsoftware.minlog.</pattern>
+                  <shadedPattern>org.apache.hudi.com.esotericsoftware.minlog.</shadedPattern>
+                </relocation>
+                <relocation>
+                  <pattern>org.objenesis.</pattern>
+                  <shadedPattern>org.apache.hudi.org.objenesis.</shadedPattern>
+                </relocation>
               </relocations>
               <createDependencyReducedPom>false</createDependencyReducedPom>
               <filters>
@@ -154,5 +175,13 @@
       <scope>compile</scope>
     </dependency>
 
+    <!-- Kryo -->
+    <dependency>
+      <groupId>com.esotericsoftware</groupId>
+      <artifactId>kryo-shaded</artifactId>
+      <version>${kryo.shaded.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
   </dependencies>
 </project>


[hudi] 01/04: [HUDI-5629] Clean CDC log files for enable/disable scenario (#7786)

Posted by yi...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 399549d7bb2cfe8dc48ea4088aa63fc121321fd4
Author: Zouxxyy <zo...@alibaba-inc.com>
AuthorDate: Mon Jan 30 14:30:39 2023 +0800

    [HUDI-5629] Clean CDC log files for enable/disable scenario (#7786)
---
 .../org/apache/hudi/table/action/clean/CleanPlanner.java     | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)

diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
index c6ff62ee764..1259872dd43 100644
--- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
+++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clean/CleanPlanner.java
@@ -30,7 +30,6 @@ import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFileGroup;
 import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -364,13 +363,10 @@ public class CleanPlanner<T, I, K, O> implements Serializable {
                 deletePaths.add(new CleanFileInfo(hoodieDataFile.getBootstrapBaseFile().get().getPath(), true));
               }
             });
-            if (hoodieTable.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ
-                || hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
-              // 1. If merge on read, then clean the log files for the commits as well;
-              // 2. If change log capture is enabled, clean the log files no matter the table type is mor or cow.
-              deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
-                  .collect(Collectors.toList()));
-            }
+            // clean the log files for the commits, which contain cdc log files in cdc scenario
+            // and normal log files for mor tables.
+            deletePaths.addAll(aSlice.getLogFiles().map(lf -> new CleanFileInfo(lf.getPath().toString(), false))
+                .collect(Collectors.toList()));
           }
         }
       }