You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by yi...@apache.org on 2023/02/13 06:24:05 UTC
[hudi] 01/05: [HUDI-5758] Restoring state of `HoodieKey` to make sure it's binary compatible w/ its state in 0.12 (#7917)
This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch release-0.13.0
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 847e7a975bfeb94956885cc252285f95afc4a843
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Fri Feb 10 15:02:47 2023 -0800
[HUDI-5758] Restoring state of `HoodieKey` to make sure it's binary compatible w/ its state in 0.12 (#7917)
RFC-46 modified `HoodieKey` to substantially optimize its serialized footprint (while using Kryo) by making it explicitly serializable by Kryo (inheriting form `KryoSerializable`, making it final).
However, this broken its binary compatibility w/ the state as it was in 0.12.2.
Unfortunately, this entailed that as this class is used in `DeleteRecord` w/in `HoodieDeleteBlock` that it also made impossible to read such blocks created by prior Hudi versions (more details in HUDI-5758).
This PR restores previous state for `HoodieKey` to make sure it stays binary compatible w/ existing persisted `HoodieDeleteBlock` created by prior Hudi versions
---
.../apache/spark/HoodieSparkKryoRegistrar.scala | 25 +++++++++++++++++--
.../org/apache/hudi/common/model/DeleteRecord.java | 9 +++++++
.../org/apache/hudi/common/model/HoodieKey.java | 28 ++++++++--------------
.../common/table/log/block/HoodieDeleteBlock.java | 2 ++
4 files changed, 44 insertions(+), 20 deletions(-)
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
index 3894065d809..9d7fa3b784f 100644
--- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -18,11 +18,12 @@
package org.apache.spark
-import com.esotericsoftware.kryo.Kryo
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
import com.esotericsoftware.kryo.serializers.JavaSerializer
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.config.SerializableConfiguration
-import org.apache.hudi.common.model.HoodieSparkRecord
+import org.apache.hudi.common.model.{HoodieKey, HoodieSparkRecord}
import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.spark.serializer.KryoRegistrator
@@ -44,12 +45,15 @@ import org.apache.spark.serializer.KryoRegistrator
* </ol>
*/
class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
+
override def registerClasses(kryo: Kryo): Unit = {
///////////////////////////////////////////////////////////////////////////
// NOTE: DO NOT REORDER REGISTRATIONS
///////////////////////////////////////////////////////////////////////////
super[HoodieCommonKryoRegistrar].registerClasses(kryo)
+ kryo.register(classOf[HoodieKey], new HoodieKeySerializer)
+
kryo.register(classOf[HoodieWriteConfig])
kryo.register(classOf[HoodieSparkRecord])
@@ -59,6 +63,23 @@ class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegist
// we're relying on [[SerializableConfiguration]] wrapper to work it around
kryo.register(classOf[SerializableConfiguration], new JavaSerializer())
}
+
+ /**
+ * NOTE: This {@link Serializer} could deserialize instance of {@link HoodieKey} serialized
+ * by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
+ */
+ class HoodieKeySerializer extends Serializer[HoodieKey] {
+ override def write(kryo: Kryo, output: Output, key: HoodieKey): Unit = {
+ output.writeString(key.getRecordKey)
+ output.writeString(key.getPartitionPath)
+ }
+
+ override def read(kryo: Kryo, input: Input, klass: Class[HoodieKey]): HoodieKey = {
+ val recordKey = input.readString()
+ val partitionPath = input.readString()
+ new HoodieKey(recordKey, partitionPath)
+ }
+ }
}
object HoodieSparkKryoRegistrar {
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
index 003b591c20c..296e95e8bfa 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/DeleteRecord.java
@@ -28,6 +28,15 @@ import java.util.Objects;
* we need to keep the ordering val to combine with the data records when merging, or the data loss
* may occur if there are intermediate deletions for the inputs
* (a new INSERT comes after a DELETE in one input batch).
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ * This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make
+ * sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add,
+ * delete, reorder or change types of the fields in this class, make class final, etc)
+ * as this would break its compatibility with already persisted blocks.
+ *
+ * Check out HUDI-5760 for more details
*/
public class DeleteRecord implements Serializable {
private static final long serialVersionUID = 1L;
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
index 5208cece1cb..436758f96f4 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieKey.java
@@ -18,11 +18,6 @@
package org.apache.hudi.common.model;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.KryoSerializable;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
import java.io.Serializable;
import java.util.Objects;
@@ -31,8 +26,17 @@ import java.util.Objects;
* <p>
* - recordKey : a recordKey that acts as primary key for a record.
* - partitionPath : the partition path of a record.
+ *
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ * This class is serialized (using Kryo) as part of {@code HoodieDeleteBlock} to make
+ * sure this stays backwards-compatible we can't MAKE ANY CHANGES TO THIS CLASS (add,
+ * delete, reorder or change types of the fields in this class, make class final, etc)
+ * as this would break its compatibility with already persisted blocks.
+ *
+ * Check out HUDI-5760 for more details
*/
-public final class HoodieKey implements Serializable, KryoSerializable {
+public class HoodieKey implements Serializable {
private String recordKey;
private String partitionPath;
@@ -86,16 +90,4 @@ public final class HoodieKey implements Serializable, KryoSerializable {
sb.append('}');
return sb.toString();
}
-
- @Override
- public void write(Kryo kryo, Output output) {
- output.writeString(recordKey);
- output.writeString(partitionPath);
- }
-
- @Override
- public void read(Kryo kryo, Input input) {
- this.recordKey = input.readString();
- this.partitionPath = input.readString();
- }
}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
index a5168072d01..d920495f971 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDeleteBlock.java
@@ -68,6 +68,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream output = new DataOutputStream(baos);
+ // TODO(HUDI-5760) avoid using Kryo for serialization here
byte[] bytesToWrite = SerializationUtils.serialize(getRecordsToDelete());
output.writeInt(version);
output.writeInt(bytesToWrite.length);
@@ -97,6 +98,7 @@ public class HoodieDeleteBlock extends HoodieLogBlock {
}
}
+ // TODO(HUDI-5760) avoid using Kryo for serialization here
private static DeleteRecord[] deserialize(int version, byte[] data) {
if (version == 1) {
// legacy version