You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by fo...@apache.org on 2023/01/31 11:05:49 UTC
[hudi] 11/17: [HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (#7024)
This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch release-0.12.1
in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 9e124834aae57cc8773f9254d34eb439df91694d
Author: Alexey Kudinkin <al...@gmail.com>
AuthorDate: Tue Oct 25 22:10:15 2022 -0700
[HUDI-4959] Fixing Avro's `Utf8` serialization in Kryo (#7024)
(cherry picked from commit b6c35394c603baa6cb718aa7d7d41b959de115a8)
---
.../hudi/common/util/SerializationUtils.java | 39 +++++++++++++++++-----
.../hudi/common/util/TestSerializationUtils.java | 29 +++++++++++++++-
2 files changed, 58 insertions(+), 10 deletions(-)
diff --git a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
index 9041db51444..872848a5d49 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/SerializationUtils.java
@@ -19,8 +19,10 @@
package org.apache.hudi.common.util;
import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+import org.apache.avro.util.Utf8;
import org.objenesis.strategy.StdInstantiatorStrategy;
import java.io.ByteArrayOutputStream;
@@ -36,9 +38,6 @@ public class SerializationUtils {
private static final ThreadLocal<KryoSerializerInstance> SERIALIZER_REF =
ThreadLocal.withInitial(KryoSerializerInstance::new);
- // Serialize
- // -----------------------------------------------------------------------
-
/**
* <p>
* Serializes an {@code Object} to a byte array for storage/serialization.
@@ -52,9 +51,6 @@ public class SerializationUtils {
return SERIALIZER_REF.get().serialize(obj);
}
- // Deserialize
- // -----------------------------------------------------------------------
-
/**
* <p>
* Deserializes a single {@code Object} from an array of bytes.
@@ -112,17 +108,42 @@ public class SerializationUtils {
private static class KryoInstantiator implements Serializable {
public Kryo newKryo() {
-
Kryo kryo = new Kryo();
- // ensure that kryo doesn't fail if classes are not registered with kryo.
+
+ // This instance of Kryo should not require prior registration of classes
kryo.setRegistrationRequired(false);
- // This would be used for object initialization if nothing else works out.
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy()));
// Handle cases where we may have an odd classloader setup like with libjars
// for hadoop
kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
+
+ // Register serializers
+ kryo.register(Utf8.class, new AvroUtf8Serializer());
+
return kryo;
}
}
+
+ /**
+ * NOTE: This {@link Serializer} could deserialize instance of {@link Utf8} serialized
+ * by implicitly generated Kryo serializer (based on {@link com.esotericsoftware.kryo.serializers.FieldSerializer}
+ */
+ private static class AvroUtf8Serializer extends Serializer<Utf8> {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void write(Kryo kryo, Output output, Utf8 utf8String) {
+ Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+ bytesSerializer.write(kryo, output, utf8String.getBytes());
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public Utf8 read(Kryo kryo, Input input, Class<Utf8> type) {
+ Serializer<byte[]> bytesSerializer = kryo.getDefaultSerializer(byte[].class);
+ byte[] bytes = bytesSerializer.read(kryo, input, byte[].class);
+ return new Utf8(bytes);
+ }
+ }
}
diff --git a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
index 9d6c1b81b04..f2714aaf9a2 100644
--- a/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
+++ b/hudi-common/src/test/java/org/apache/hudi/common/util/TestSerializationUtils.java
@@ -19,15 +19,21 @@
package org.apache.hudi.common.util;
import org.apache.avro.util.Utf8;
+import org.apache.hudi.common.model.DeleteRecord;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.table.log.block.HoodieDeleteBlock;
import org.junit.jupiter.api.Test;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.Objects;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -52,12 +58,33 @@ public class TestSerializationUtils {
verifyObject(new LinkedList<>(Arrays.asList(2, 3, 5)));
}
+ @Test
+ public void testAvroUtf8SerDe() throws IOException {
+ byte[] firstBytes = SerializationUtils.serialize(new Utf8("test"));
+ // 4 byte string + 3 bytes length (Kryo uses variable-length encoding)
+ assertEquals(7, firstBytes.length);
+ }
+
+ @Test
+ public void testClassFullyQualifiedNameSerialization() throws IOException {
+ DeleteRecord deleteRecord = DeleteRecord.create(new HoodieKey("key", "partition"));
+ HoodieDeleteBlock deleteBlock = new HoodieDeleteBlock(new DeleteRecord[]{deleteRecord}, Collections.emptyMap());
+
+ byte[] firstBytes = SerializationUtils.serialize(deleteBlock);
+ byte[] secondBytes = SerializationUtils.serialize(deleteBlock);
+
+ assertNotSame(firstBytes, secondBytes);
+ // NOTE: Here we assert that Kryo doesn't optimize out the fully-qualified class-name
+ // and always writes it out
+ assertEquals(ByteBuffer.wrap(firstBytes), ByteBuffer.wrap(secondBytes));
+ }
+
private <T> void verifyObject(T expectedValue) throws IOException {
byte[] serializedObject = SerializationUtils.serialize(expectedValue);
assertNotNull(serializedObject);
assertTrue(serializedObject.length > 0);
- final T deserializedValue = SerializationUtils.<T>deserialize(serializedObject);
+ final T deserializedValue = SerializationUtils.deserialize(serializedObject);
if (expectedValue == null) {
assertNull(deserializedValue);
} else {