You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/13 06:24:05 UTC

[hudi] 01/05: [HUDI-5758] Restoring state of `HoodieKey` to make sure it's binary compatible w/ its state in 0.12 (#7917)

This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 847e7a975bfeb94956885cc252285f95afc4a843
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Feb 10 15:02:47 2023 -0800

    [HUDI-5758] Restoring state of `HoodieKey` to make sure it's binary compatible w/ its state in 0.12 (#7917)
    
    RFC-46 modified `HoodieKey` to substantially optimize its serialized footprint (while using Kryo) by making it explicitly serializable by Kryo (inheriting form `KryoSerializable`, making it final).
    
    However, this broken its binary compatibility w/ the state as it was in 0.12.2.
    
    Unfortunately, this entailed that as this class is used in `DeleteRecord` w/in `HoodieDeleteBlock` that it also made impossible to read such blocks created by prior Hudi versions (more details in HUDI-5758).
    
    This PR restores previous state for `HoodieKey` to make sure it stays binary compatible w/ existing persisted `HoodieDeleteBlock` created by prior Hudi versions
---
 .../apache/spark/HoodieSparkKryoRegistrar.scala    | 25 +++++++++++++++++--
 .../org/apache/hudi/common/model/DeleteRecord.java |  9 +++++++
 .../org/apache/hudi/common/model/HoodieKey.java    | 28 ++++++++--------------
 .../common/table/log/block/HoodieDeleteBlock.java  |  2 ++
 4 files changed, 44 insertions(+), 20 deletions(-)

diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index 3894065d809..9d7fa3b784f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -18,11 +18,12 @@
 
 package org.apache.spark
 
-import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
 import com.esotericsoftware.kryo.serializers.JavaSerializer
 import org.apache.hudi.client.model.HoodieInternalRow
 import org.apache.hudi.common.config.SerializableConfiguration
-import org.apache.hudi.common.model.HoodieSparkRecord
+import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
 import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
 import org.apache.hudi.config.HoodieWriteConfig
 import org.apache.spark.serializer.KryoRegistrator
@@ -44,12 +45,15 @@ import org.apache.spark.serializer.KryoRegistrator
  * </ol>
  */
 class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
+
   override def registerClasses(kryo: Kryo): Unit = {
     ///////////////////////////////////////////////////////////////////////////
     // NOTE: DO NOT REORDER REGISTRATIONS
     ///////////////////////////////////////////////////////////////////////////
     super[HoodieCommonKryoRegistrar].registerClasses(kryo)
 
+    kryo.register(classOf[HoodieKey], new HoodieKeySerializer)
+
     kryo.register(classOf[HoodieWriteConfig])
 
     kryo.register(classOf[HoodieSparkRecord])
@@ -59,6 +63,23 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist
     //       we're relying on [[SerializableConfiguration]] wrapper to work it around
     kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
   }
+
+  /**
+   * NOTE: This {@link Serializer} could deserialize instance of {@link HoodieKey} serialized
+   *       by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
+   */
+  class HoodieKeySerializer extends Serializer[HoodieKey] {
+    override def write(kryo: Kryo, output: Output, key: HoodieKey): Unit = {
+      output.writeString(key.getRecordKey)
+      output.writeString(key.getPartitionPath)
+    }
+
+    override def read(kryo: Kryo, input: Input, klass: Class[HoodieKey]): HoodieKey = {
+      val recordKey = input.readString()
+      val partitionPath = input.readString()
+      new HoodieKey(recordKey, partitionPath)
+    }
+  }
 }
 
 object HoodieSparkKryoRegistrar {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
index 003b591c20c..296e95e8bfa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
@@ -28,6 +28,15 @@ import java.util.Objects;
  * we need to keep the ordering val to combine with the data records when merging, or the data loss
  * may occur if there are intermediate deletions for the inputs
  * (a new INSERT comes after a DELETE in one input batch).
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ *       This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make
+ *       sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add,
+ *       delete, reorder or change types of the fields in this class, make class final, etc)
+ *       as this would break its compatibility with already persisted blocks.
+ *
+ *       Check out HUDI-5760 for more details
  */
 public class DeleteRecord implements Serializable {
   private static final long serialVersionUID = 1L;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
index 5208cece1cb..436758f96f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.common.model;
 
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
 import java.io.Serializable;
 import java.util.Objects;
 
@@ -31,8 +26,17 @@ import java.util.Objects;
  * <p>
  * - recordKey : a recordKey that acts as primary key for a record.
  * - partitionPath : the partition path of a record.
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ *       This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make
+ *       sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add,
+ *       delete, reorder or change types of the fields in this class, make class final, etc)
+ *       as this would break its compatibility with already persisted blocks.
+ *
+ *       Check out HUDI-5760 for more details
  */
-public final class HoodieKey implements Serializable, KryoSerializable {
+public class HoodieKey implements Serializable {
 
   private String recordKey;
   private String partitionPath;
@@ -86,16 +90,4 @@ public final class HoodieKey implements Serializable, KryoSerializable {
     sb.append('}');
     return sb.toString();
   }
-
-  @Override
-  public void write(Kryo kryo, Output output) {
-    output.writeString(recordKey);
-    output.writeString(partitionPath);
-  }
-
-  @Override
-  public void read(Kryo kryo, Input input) {
-    this.recordKey = input.readString();
-    this.partitionPath = input.readString();
-  }
 }
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index a5168072d01..d920495f971 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -68,6 +68,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
 
     ByteArrayOutputStream baos = new ByteArrayOutputStream();
     DataOutputStream output = new DataOutputStream(baos);
+    // TODO(HUDI-5760) avoid using Kryo for serialization here
     byte[] bytesToWrite = SerializationUtils.serialize(getRecordsToDelete());
     output.writeInt(version);
     output.writeInt(bytesToWrite.length);
@@ -97,6 +98,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
     }
   }
 
+  // TODO(HUDI-5760) avoid using Kryo for serialization here
   private static DeleteRecord[] deserialize(int version, byte[] data) {
     if (version == 1) {
       // legacy version