You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by ak...@apache.org on 2022/10/28 01:04:02 UTC

[hudi] branch release-feature-rfc46 updated: [MINOR] Properly registering target classes w/ Kryo (#7026)

This is an automated email from the ASF dual-hosted git repository.

akudinkin pushed a commit to branch release-feature-rfc46
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/release-feature-rfc46 by this push:
     new 4282d890d8 [MINOR] Properly registering target classes w/ Kryo (#7026)
4282d890d8 is described below

commit 4282d890d877030d8e883ee576628962df2b629e
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Thu Oct 27 18:03:54 2022 -0700

    [MINOR] Properly registering target classes w/ Kryo (#7026)
    
    * Added `HoodieKryoRegistrar` registering necessary Hudi's classes w/ Kryo to make their serialization more efficient (by serializing just the class id, in-liue the fully qualified class-name)
    
    * Redirected Kryo registration to `HoodieKryoRegistrar`
    
    * Registered additional classes likely to be serialized by Kryo
    
    * Updated tests
    
    * Fixed serialization of Avro's `Utf8` to serialize just the bytes
    
    * Added tests
    
    * Added custom `AvroUtf8Serializer`;
    Tidying up
    
    * Extracted `HoodieCommonKryoRegistrar` to leverage in `SerializationUtils`
    
    * `HoodieKryoRegistrar` > `HoodieSparkKryoRegistrar`;
    Rebased `HoodieSparkKryoRegistrar` onto `HoodieCommonKryoRegistrar`
    
    * `lint`
    
    * Fixing compilation for Spark 2.x
    
    * Disabling flaky test
---
 .../java/org/apache/hudi/cli/utils/SparkUtil.java  |  4 +-
 .../cli/functional/CLIFunctionalTestHarness.java   |  4 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    | 12 ---
 .../apache/spark/HoodieSparkKryoRegistrar.scala    | 69 +++++++++++++++++
 .../java/org/apache/hudi/table/TestCleaner.java    |  4 +
 .../hudi/testutils/FunctionalTestHarness.java      |  3 +-
 .../SparkClientFunctionalTestHarness.java          |  3 +-
 .../common/util/HoodieCommonKryoRegistrar.java     | 89 ++++++++++++++++++++++
 .../hudi/common/util/SerializationUtils.java       | 39 +++++++---
 .../hudi/common/util/TestSerializationUtils.java   | 29 ++++++-
 .../quickstart/TestHoodieSparkQuickstart.java      | 14 +---
 .../model/TestHoodieRecordSerialization.scala      | 12 +--
 .../org/apache/hudi/utilities/UtilHelpers.java     |  7 +-
 .../apache/hudi/utilities/TestHoodieIndexer.java   |  3 +-
 .../hudi/utilities/TestHoodieRepairTool.java       |  4 +-
 15 files changed, 245 insertions(+), 51 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
index bcccb66b37..e333a6167a 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/utils/SparkUtil.java
@@ -21,11 +21,11 @@ package org.apache.hudi.cli.utils;
 import org.apache.hudi.cli.HoodieCliSparkConfig;
 import org.apache.hudi.cli.commands.SparkEnvCommand;
 import org.apache.hudi.cli.commands.SparkMain;
-import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.StringUtils;
 
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.launcher.SparkLauncher;
@@ -116,7 +116,7 @@ public class SparkUtil {
   }
 
   public static JavaSparkContext initJavaSparkContext(SparkConf sparkConf) {
-    SparkRDDWriteClient.registerClasses(sparkConf);
+    HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
     JavaSparkContext jsc = new JavaSparkContext(sparkConf);
     jsc.hadoopConfiguration().setBoolean(HoodieCliSparkConfig.CLI_PARQUET_ENABLE_SUMMARY_METADATA, false);
     FSUtils.prepareHadoopConf(jsc.hadoopConfiguration());
diff --git a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
index 04f77df549..6d6335ab0f 100644
--- a/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
+++ b/hudi-cli/src/test/java/org/apache/hudi/cli/functional/CLIFunctionalTestHarness.java
@@ -20,7 +20,6 @@
 package org.apache.hudi.cli.functional;
 
 import org.apache.hudi.client.SparkRDDReadClient;
-import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.testutils.HoodieClientTestUtils;
@@ -28,6 +27,7 @@ import org.apache.hudi.testutils.providers.SparkProvider;
 import org.apache.hudi.timeline.service.TimelineService;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -100,7 +100,7 @@ public class CLIFunctionalTestHarness implements SparkProvider {
     initialized = spark != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 650c617e41..49281b931b 100644
--- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -62,7 +62,6 @@ import com.codahale.metrics.Timer;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
@@ -97,17 +96,6 @@ public class SparkRDDWriteClient<T> extends
     super(context, writeConfig, timelineService, SparkUpgradeDowngradeHelper.getInstance());
   }
 
-  /**
-   * Register hudi classes for Kryo serialization.
-   *
-   * @param conf instance of SparkConf
-   * @return SparkConf
-   */
-  public static SparkConf registerClasses(SparkConf conf) {
-    conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, HoodieRecord.class, HoodieKey.class});
-    return conf;
-  }
-
   @Override
   protected HoodieIndex createIndex(HoodieWriteConfig writeConfig) {
     return SparkHoodieIndexFactory.createIndex(config);
diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
new file mode 100644
index 0000000000..372bff2b6d
--- /dev/null
+++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/HoodieSparkKryoRegistrar.scala
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import com.esotericsoftware.kryo.Kryo
+import org.apache.hudi.client.model.HoodieInternalRow
+import org.apache.hudi.commmon.model.HoodieSparkRecord
+import org.apache.hudi.common.util.HoodieCommonKryoRegistrar
+import org.apache.hudi.config.HoodieWriteConfig
+import org.apache.spark.internal.config.ConfigBuilder
+import org.apache.spark.serializer.KryoRegistrator
+
+/**
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ * This class is responsible for registering Hudi specific components that are often
+ * serialized by Kryo (for ex, during Spark's Shuffling operations) to make sure Kryo
+ * doesn't need to serialize their full class-names (for every object) which will quickly
+ * add up to considerable amount of overhead.
+ *
+ * Please note of the following:
+ * <ol>
+ *   <li>Ordering of the registration COULD NOT change as it's directly impacting
+ *   associated class ids (on the Kryo side)</li>
+ *   <li>This class might be loaded up using reflection and as such should not be relocated
+ *   or renamed (w/o correspondingly updating such usages)</li>
+ * </ol>
+ */
+class HoodieSparkKryoRegistrar extends HoodieCommonKryoRegistrar with KryoRegistrator {
+  override def registerClasses(kryo: Kryo): Unit = {
+    ///////////////////////////////////////////////////////////////////////////
+    // NOTE: DO NOT REORDER REGISTRATIONS
+    ///////////////////////////////////////////////////////////////////////////
+    super[HoodieCommonKryoRegistrar].registerClasses(kryo)
+
+    kryo.register(classOf[HoodieWriteConfig])
+
+    kryo.register(classOf[HoodieSparkRecord])
+    kryo.register(classOf[HoodieInternalRow])
+  }
+}
+
+object HoodieSparkKryoRegistrar {
+
+  // NOTE: We're copying definition of the config introduced in Spark 3.0
+  //       (to stay compatible w/ Spark 2.4)
+  private val KRYO_USER_REGISTRATORS = "spark.kryo.registrator"
+
+  def register(conf: SparkConf): SparkConf = {
+    conf.set(KRYO_USER_REGISTRATORS, Seq(classOf[HoodieSparkKryoRegistrar].getName).mkString(","))
+  }
+
+}
\ No newline at end of file
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
index 0f40c28508..e634bc0710 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestCleaner.java
@@ -87,6 +87,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -458,7 +459,10 @@ public class TestCleaner extends HoodieClientTestBase {
 
   /**
    * Test Clean-By-Commits using insert/upsert API.
+   *
+   * TODO reenable test after rebasing on master
    */
+  @Disabled
   @ParameterizedTest
   @ValueSource(booleans = {true, false})
   public void testInsertAndCleanByCommits(boolean isAsync) throws Exception {
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
index 9d28577059..cdf762db0a 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/FunctionalTestHarness.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -138,7 +139,7 @@ public class FunctionalTestHarness implements SparkProvider, DFSProvider, Hoodie
     initialized = spark != null && hdfsTestService != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
index c452f413bc..448c914367 100644
--- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
+++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/SparkClientFunctionalTestHarness.java
@@ -59,6 +59,7 @@ import org.apache.hudi.testutils.providers.HoodieMetaClientProvider;
 import org.apache.hudi.testutils.providers.HoodieWriteClientProvider;
 import org.apache.hudi.testutils.providers.SparkProvider;
 import org.apache.hudi.timeline.service.TimelineService;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -184,7 +185,7 @@ public class SparkClientFunctionalTestHarness implements SparkProvider, HoodieMe
     initialized = spark != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
new file mode 100644
index 0000000000..82d4eb7757
--- /dev/null
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/HoodieCommonKryoRegistrar.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util;
+
+import com.esotericsoftware.kryo.Kryo;
+import org.apache.avro.util.Utf8;
+import org.apache.hudi.common.HoodieJsonPayload;
+import org.apache.hudi.common.model.AWSDmsAvroPayload;
+import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.EventTimeAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.HoodieAvroRecord;
+import org.apache.hudi.common.model.HoodieEmptyRecord;
+import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.RewriteAvroPayload;
+import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
+import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
+import org.apache.hudi.metadata.HoodieMetadataPayload;
+
+/**
+ * NOTE: PLEASE READ CAREFULLY BEFORE CHANGING
+ *
+ * This class is responsible for registering Hudi specific components that are often
+ * serialized by Kryo (for ex, during Spark's Shuffling operations) to make sure Kryo
+ * doesn't need to serialize their full class-names (for every object) which will quickly
+ * add up to considerable amount of overhead.
+ *
+ * Please note of the following:
+ * <ol>
+ *   <li>Ordering of the registration COULD NOT change as it's directly impacting
+ *   associated class ids (on the Kryo side)</li>
+ *   <li>This class might be loaded up using reflection and as such should not be relocated
+ *   or renamed (w/o correspondingly updating such usages)</li>
+ * </ol>
+ */
+public class HoodieCommonKryoRegistrar {
+
+  public void registerClasses(Kryo kryo) {
+    ///////////////////////////////////////////////////////////////////////////
+    // NOTE: DO NOT REORDER REGISTRATIONS
+    ///////////////////////////////////////////////////////////////////////////
+
+    kryo.register(HoodieAvroRecord.class);
+    kryo.register(HoodieAvroIndexedRecord.class);
+    kryo.register(HoodieEmptyRecord.class);
+
+    kryo.register(OverwriteWithLatestAvroPayload.class);
+    kryo.register(DefaultHoodieRecordPayload.class);
+    kryo.register(OverwriteNonDefaultsWithLatestAvroPayload.class);
+    kryo.register(RewriteAvroPayload.class);
+    kryo.register(EventTimeAvroPayload.class);
+    kryo.register(PartialUpdateAvroPayload.class);
+    kryo.register(MySqlDebeziumAvroPayload.class);
+    kryo.register(PostgresDebeziumAvroPayload.class);
+    // TODO need to relocate to hudi-common
+    //kryo.register(BootstrapRecordPayload.class);
+    kryo.register(AWSDmsAvroPayload.class);
+    kryo.register(HoodieAvroPayload.class);
+    kryo.register(HoodieJsonPayload.class);
+    kryo.register(HoodieMetadataPayload.class);
+
+    kryo.register(HoodieRecordLocation.class);
+    kryo.register(HoodieRecordGlobalLocation.class);
+
+    kryo.register(Utf8.class, new SerializationUtils.AvroUtf8Serializer());
+  }
+
+}
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 9041db5144..9254b20a06 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.common.util;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.util.Utf8;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.ByteArrayOutputStream;
@@ -36,9 +38,6 @@ public class SerializationUtils {
   private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
       ThreadLocal.withInitial(KryoSerializerInstance::new);
 
-  // Serialize
-  // -----------------------------------------------------------------------
-
   /**
    * <p>
    * Serializes an {@code Object} to a byte array for storage/serialization.
@@ -52,9 +51,6 @@ public class SerializationUtils {
     return SERIALIZER_REF.get().serialize(obj);
   }
 
-  // Deserialize
-  // -----------------------------------------------------------------------
-
   /**
    * <p>
    * Deserializes a single {@code Object} from an array of bytes.
@@ -112,17 +108,42 @@ public class SerializationUtils {
   private static class KryoInstantiator implements Serializable {
 
     public Kryo newKryo() {
-
       Kryo kryo = new Kryo();
-      // ensure that kryo doesn't fail if classes are not registered with kryo.
+
+      // This instance of Kryo should not require prior registration of classes
       kryo.setRegistrationRequired(false);
-      // This would be used for object initialization if nothing else works out.
       kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
       // Handle cases where we may have an odd classloader setup like with libjars
       // for hadoop
       kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+      // Register Hudi's classes
+      new HoodieCommonKryoRegistrar().registerClasses(kryo);
+
       return kryo;
     }
 
   }
+
+  /**
+   * NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
+   *       by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
+   */
+  public static class AvroUtf8Serializer extends Serializer<Utf8> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void write(Kryo kryo, Output output, Utf8 utf8String) {
+      Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+      bytesSerializer.write(kryo, output, utf8String.getBytes());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
+      Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+      byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
+      return new Utf8(bytes);
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index 9d6c1b81b0..fe02309137 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -19,15 +19,21 @@
 package org.apache.hudi.common.util;
 
 import org.apache.avro.util.Utf8;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Objects;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -52,12 +58,33 @@ public class TestSerializationUtils {
     verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
   }
 
+  @Test
+  public void testAvroUtf8SerDe() throws IOException {
+    byte[] firstBytes = SerializationUtils.serialize(new Utf8("test"));
+    // 4 byte string + 3 bytes length (Kryo uses variable-length encoding)
+    assertEquals(firstBytes.length, 7);
+  }
+
+  @Test
+  public void testClassFullyQualifiedNameSerialization() throws IOException {
+    DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition"));
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap());
+
+    byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
+    byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
+
+    assertNotSame(firstBytes, secondBytes);
+    // NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name
+    //       and always writes it out
+    assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes));
+  }
+
   private <T> void verifyObject(T expectedValue) throws IOException {
     byte[] serializedObject = SerializationUtils.serialize(expectedValue);
     assertNotNull(serializedObject);
     assertTrue(serializedObject.length > 0);
 
-    final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
+    final T deserializedValue = SerializationUtils.deserialize(serializedObject);
     if (expectedValue == null) {
       assertNull(deserializedValue);
     } else {
diff --git a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
index c23db7f8e7..96c73dd240 100644
--- a/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
+++ b/hudi-examples/hudi-examples-spark/src/test/java/org/apache/hudi/examples/quickstart/TestHoodieSparkQuickstart.java
@@ -19,12 +19,10 @@
 package org.apache.hudi.examples.quickstart;
 
 import org.apache.hudi.client.SparkRDDReadClient;
-import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
-import org.apache.hudi.common.model.HoodieAvroPayload;
-import org.apache.hudi.examples.common.HoodieExampleDataGenerator;
 import org.apache.hudi.testutils.providers.SparkProvider;
 
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -37,15 +35,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.io.File;
 import java.nio.file.Paths;
 
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.delete;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.deleteByPartition;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.incrementalQuery;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertData;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.insertOverwriteData;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.pointInTimeQuery;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.queryData;
 import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.runQuickstart;
-import static org.apache.hudi.examples.quickstart.HoodieSparkQuickstart.updateData;
 
 public class TestHoodieSparkQuickstart implements SparkProvider {
   protected static HoodieSparkEngineContext context;
@@ -94,7 +84,7 @@ public class TestHoodieSparkQuickstart implements SparkProvider {
     initialized = spark != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
index 8329fda093..eb1339ad2f 100644
--- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
+++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/common/model/TestHoodieRecordSerialization.scala
@@ -81,8 +81,8 @@ class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness {
     val hoodieInternalRow = new HoodieInternalRow(new Array[UTF8String](5), unsafeRow, false)
 
     Seq(
-      (unsafeRow, rowSchema, 135),
-      (hoodieInternalRow, addMetaFields(rowSchema), 175)
+      (unsafeRow, rowSchema, 87),
+      (hoodieInternalRow, addMetaFields(rowSchema), 127)
     ) foreach { case (row, schema, expectedSize) => routine(row, schema, expectedSize) }
   }
 
@@ -110,8 +110,8 @@ class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness {
     val avroIndexedRecord = new HoodieAvroIndexedRecord(key, avroRecord)
 
     Seq(
-      (legacyRecord, 573),
-      (avroIndexedRecord, 442)
+      (legacyRecord, 527),
+      (avroIndexedRecord, 389)
     ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
   }
 
@@ -132,8 +132,8 @@ class TestHoodieRecordSerialization extends SparkClientFunctionalTestHarness {
     val key = new HoodieKey("rec-key", "part-path")
 
     Seq(
-      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1, HoodieRecordType.AVRO), 74),
-      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 2, HoodieRecordType.SPARK), 74)
+      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 1, HoodieRecordType.AVRO), 27),
+      (new HoodieEmptyRecord[GenericRecord](key, HoodieOperation.INSERT, 2, HoodieRecordType.SPARK), 27)
     ) foreach { case (record, expectedSize) => routine(record, expectedSize) }
   }
 
diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
index 523546c9ef..48645febe4 100644
--- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
+++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/UtilHelpers.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -282,7 +283,8 @@ public class UtilHelpers {
     sparkConf.set("spark.driver.allowMultipleContexts", "true");
 
     additionalConfigs.forEach(sparkConf::set);
-    return SparkRDDWriteClient.registerClasses(sparkConf);
+    HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
+    return sparkConf;
   }
 
   private static SparkConf buildSparkConf(String appName, Map<String, String> additionalConfigs) {
@@ -296,7 +298,8 @@ public class UtilHelpers {
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
 
     additionalConfigs.forEach(sparkConf::set);
-    return SparkRDDWriteClient.registerClasses(sparkConf);
+    HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
+    return sparkConf;
   }
 
   public static JavaSparkContext buildSparkContext(String appName, Map<String, String> configs) {
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 87afd56d83..374fe868cb 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -42,6 +42,7 @@ import org.apache.hudi.metadata.MetadataPartitionType;
 import org.apache.hudi.testutils.providers.SparkProvider;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -88,7 +89,7 @@ public class TestHoodieIndexer extends HoodieCommonTestHarness implements SparkP
     boolean initialized = spark != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();
diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
index 00cf3ae883..bac24f3484 100644
--- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
+++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieRepairTool.java
@@ -21,7 +21,6 @@ package org.apache.hudi.utilities;
 
 import org.apache.hudi.HoodieTestCommitGenerator;
 import org.apache.hudi.client.SparkRDDReadClient;
-import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
@@ -36,6 +35,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.HoodieSparkKryoRegistrar$;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.SQLContext;
@@ -93,7 +93,7 @@ public class TestHoodieRepairTool extends HoodieCommonTestHarness implements Spa
     boolean initialized = spark != null;
     if (!initialized) {
       SparkConf sparkConf = conf();
-      SparkRDDWriteClient.registerClasses(sparkConf);
+      HoodieSparkKryoRegistrar$.MODULE$.register(sparkConf);
       SparkRDDReadClient.addHoodieSupport(sparkConf);
       spark = SparkSession.builder().config(sparkConf).getOrCreate();
       sqlContext = spark.sqlContext();