You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:49 UTC

[hudi] 11/17: [HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (#7024)

This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 9e124834aae57cc8773f9254d34eb439df91694d
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Tue Oct 25 22:10:15 2022 -0700

    [HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (#7024)
    
    (cherry picked from commit b6c35394c603baa6cb718aa7d7d41b959de115a8)
---
 .../hudi/common/util/SerializationUtils.java       | 39 +++++++++++++++++-----
 .../hudi/common/util/TestSerializationUtils.java   | 29 +++++++++++++++-
 2 files changed, 58 insertions(+), 10 deletions(-)

diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 9041db51444..872848a5d49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -19,8 +19,10 @@
 package org.apache.hudi.common.util;
 
 import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
 import com.esotericsoftware.kryo.io.Input;
 import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.util.Utf8;
 import org.objenesis.strategy.StdInstantiatorStrategy;
 
 import java.io.ByteArrayOutputStream;
@@ -36,9 +38,6 @@ public class SerializationUtils {
   private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
       ThreadLocal.withInitial(KryoSerializerInstance::new);
 
-  // Serialize
-  // -----------------------------------------------------------------------
-
   /**
    * <p>
    * Serializes an {@code Object} to a byte array for storage/serialization.
@@ -52,9 +51,6 @@ public class SerializationUtils {
     return SERIALIZER_REF.get().serialize(obj);
   }
 
-  // Deserialize
-  // -----------------------------------------------------------------------
-
   /**
    * <p>
    * Deserializes a single {@code Object} from an array of bytes.
@@ -112,17 +108,42 @@ public class SerializationUtils {
   private static class KryoInstantiator implements Serializable {
 
     public Kryo newKryo() {
-
       Kryo kryo = new Kryo();
-      // ensure that kryo doesn't fail if classes are not registered with kryo.
+
+      // This instance of Kryo should not require prior registration of classes
       kryo.setRegistrationRequired(false);
-      // This would be used for object initialization if nothing else works out.
       kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
       // Handle cases where we may have an odd classloader setup like with libjars
       // for hadoop
       kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+      // Register serializers
+      kryo.register(Utf8.class, new AvroUtf8Serializer());
+
       return kryo;
     }
 
   }
+
+  /**
+   * NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
+   *       by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
+   */
+  private static class AvroUtf8Serializer extends Serializer<Utf8> {
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void write(Kryo kryo, Output output, Utf8 utf8String) {
+      Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+      bytesSerializer.write(kryo, output, utf8String.getBytes());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
+      Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+      byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
+      return new Utf8(bytes);
+    }
+  }
 }
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index 9d6c1b81b04..f2714aaf9a2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -19,15 +19,21 @@
 package org.apache.hudi.common.util;
 
 import org.apache.avro.util.Utf8;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.LinkedList;
 import java.util.Objects;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -52,12 +58,33 @@ public class TestSerializationUtils {
     verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
   }
 
+  @Test
+  public void testAvroUtf8SerDe() throws IOException {
+    byte[] firstBytes = SerializationUtils.serialize(new Utf8("test"));
+    // 4 byte string + 3 bytes length (Kryo uses variable-length encoding)
+    assertEquals(7, firstBytes.length);
+  }
+
+  @Test
+  public void testClassFullyQualifiedNameSerialization() throws IOException {
+    DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition"));
+    HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap());
+
+    byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
+    byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
+
+    assertNotSame(firstBytes, secondBytes);
+    // NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name
+    //       and always writes it out
+    assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes));
+  }
+
   private <T> void verifyObject(T expectedValue) throws IOException {
     byte[] serializedObject = SerializationUtils.serialize(expectedValue);
     assertNotNull(serializedObject);
     assertTrue(serializedObject.length > 0);
 
-    final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
+    final T deserializedValue = SerializationUtils.deserialize(serializedObject);
     if (expectedValue == null) {
       assertNull(deserializedValue);
     } else {