You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hudi.apache.org by "voon (Jira)" <ji...@apache.org> on 2023/03/28 02:05:00 UTC
[jira] [Updated] (HUDI-5992) Kryo SerDe error when performing compaction with GenericData$Fixed types on Hudi-Flink
[ https://issues.apache.org/jira/browse/HUDI-5992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
voon updated HUDI-5992:
-----------------------
Description:
Hudi-Flink is using Avro 1.10.0, while the rest (or rather, most) of the other Hudi-* modules are using Avro 1.8.2.
Avro introduced a few changes after version 1.8.2. One of which is the introduction of anonymous classes in JsonProperties.java (org.apache.avro.JsonProperties#props). The field *props* is implemented via anonymous inner classes, causing issues when performing deSerializers.
The error (in the stacktrace) can be triggered if a table with a *precombineField* of type DECIMAL(p, s) is written into the BitCaskDiskMap when performing a compaction or when merge is required and the SpillableDiskMap is required to spill onto the *DiskMap.
More details can be found in this Apache's Jira issue here: AVRO-3438
Spark has encountered this issue: SPARK-34477
A workaround for this is to copy Spark's implementation of *GenericAvroSerializer* over (and simplify) it a little.
To reproduce this error, create this test under the directory:
{code:java}
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestBitCaskDiskMapFromFlink.java
{code}
{code:java}
package org.apache.hudi.sink.utils;
import java.io.IOException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.collection.BitCaskDiskMap;
import org.junit.jupiter.api.Test;
public class TestBitCaskDiskMapFromFlink extends HoodieCommonTestHarness {
@Test
public void testPutDecimal() throws IOException {
// the avro version used by hudi-flink module is 1.10.0
// placing the test here will use avro 1.10.0, allowing the error caused by anonymous classes to be thrown
BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, true);
Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
.addToSchema(Schema.createFixed("fixed", null, "record.precombineField", 9));
byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, -32};
GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, decimalFieldBytes);
HoodieRecord avroRecord = new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
new EventTimeAvroPayload(null, (Comparable) genericFixed));
records.put("a", avroRecord);
records.get("a");
}
} {code}
Stacktrace for the error:
{code:java}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$FixedSchema)
schema (org.apache.avro.generic.GenericData$Fixed)
orderingVal (org.apache.hudi.common.model.EventTimeAvroPayload)
data (org.apache.hudi.common.model.HoodieAvroRecord) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:106)
at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:80)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:210)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:203)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:199)
at org.apache.hudi.sink.utils.TestBitCaskDiskMapFromFlink.testPutDecimal(TestBitCaskDiskMapFromFlink.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.NullPointerException
at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 84 more {code}
was:
Hudi-Flink is using Avro 1.10.0, while the rest (or rather, most) of the other Hudi-* modules are using Avro 1.8.2.
Avro introduced a few changes after version 1.8.2. One of which is the introduction of anonymous classes in JsonProperties.java (org.apache.avro.JsonProperties#props). The field *props* is implemented via anonymous inner classes, causing issues when performing desers.
More details can be found in this Apache's Jira issue here: AVRO-3438
Spark has encountered this issue: SPARK-34477
A workaround for this is to copy Spark's implementation of GenericAvroSerializer over (and simplify) it a little.
To reproduce this error, create this test under the directory:
{code:java}
hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestBitCaskDiskMapFromFlink.java
{code}
{code:java}
package org.apache.hudi.sink.utils;
import java.io.IOException;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericFixed;
import org.apache.hudi.common.model.EventTimeAvroPayload;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
import org.apache.hudi.common.util.collection.BitCaskDiskMap;
import org.junit.jupiter.api.Test;
public class TestBitCaskDiskMapFromFlink extends HoodieCommonTestHarness {
@Test
public void testPutDecimal() throws IOException {
// the avro version used by hudi-flink module is 1.10.0
// placing the test here will use avro 1.10.0, allowing the error caused by anonymous classes to be thrown
BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, true);
Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
.addToSchema(Schema.createFixed("fixed", null, "record.precombineField", 9));
byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, -32};
GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, decimalFieldBytes);
HoodieRecord avroRecord = new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
new EventTimeAvroPayload(null, (Comparable) genericFixed));
records.put("a", avroRecord);
records.get("a");
}
} {code}
Stacktrace for the error:
{code:java}
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
props (org.apache.avro.Schema$FixedSchema)
schema (org.apache.avro.generic.GenericData$Fixed)
orderingVal (org.apache.hudi.common.model.EventTimeAvroPayload)
data (org.apache.hudi.common.model.HoodieAvroRecord) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:106)
at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:80)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:210)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:203)
at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:199)
at org.apache.hudi.sink.utils.TestBitCaskDiskMapFromFlink.testPutDecimal(TestBitCaskDiskMapFromFlink.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at java.util.ArrayList.forEach(ArrayList.java:1259)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
Caused by: java.lang.NullPointerException
at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
... 84 more {code}
> Kryo SerDe error when performing compaction with GenericData$Fixed types on Hudi-Flink
> --------------------------------------------------------------------------------------
>
> Key: HUDI-5992
> URL: https://issues.apache.org/jira/browse/HUDI-5992
> Project: Apache Hudi
> Issue Type: Bug
> Reporter: voon
> Assignee: voon
> Priority: Major
>
> Hudi-Flink is using Avro 1.10.0, while the rest (or rather, most) of the other Hudi-* modules are using Avro 1.8.2.
>
> Avro introduced a few changes after version 1.8.2. One of which is the introduction of anonymous classes in JsonProperties.java (org.apache.avro.JsonProperties#props). The field *props* is implemented via anonymous inner classes, causing issues when performing deSerializers.
>
> The error (in the stacktrace) can be triggered if a table with a *precombineField* of type DECIMAL(p, s) is written into the BitCaskDiskMap when performing a compaction or when merge is required and the SpillableDiskMap is required to spill onto the *DiskMap.
>
> More details can be found in this Apache's Jira issue here: AVRO-3438
> Spark has encountered this issue: SPARK-34477
>
> A workaround for this is to copy Spark's implementation of *GenericAvroSerializer* over (and simplify) it a little.
>
> To reproduce this error, create this test under the directory:
>
> {code:java}
> hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/utils/TestBitCaskDiskMapFromFlink.java
> {code}
>
> {code:java}
> package org.apache.hudi.sink.utils;
> import java.io.IOException;
> import org.apache.avro.LogicalTypes;
> import org.apache.avro.Schema;
> import org.apache.avro.generic.GenericData;
> import org.apache.avro.generic.GenericFixed;
> import org.apache.hudi.common.model.EventTimeAvroPayload;
> import org.apache.hudi.common.model.HoodieAvroRecord;
> import org.apache.hudi.common.model.HoodieKey;
> import org.apache.hudi.common.model.HoodieRecord;
> import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
> import org.apache.hudi.common.util.collection.BitCaskDiskMap;
> import org.junit.jupiter.api.Test;
> public class TestBitCaskDiskMapFromFlink extends HoodieCommonTestHarness {
> @Test
> public void testPutDecimal() throws IOException {
> // the avro version used by hudi-flink module is 1.10.0
> // placing the test here will use avro 1.10.0, allowing the error caused by anonymous classes to be thrown
> BitCaskDiskMap<String, HoodieRecord> records = new BitCaskDiskMap<>(basePath, true);
> Schema precombineFieldSchema = LogicalTypes.decimal(20, 0)
> .addToSchema(Schema.createFixed("fixed", null, "record.precombineField", 9));
> byte[] decimalFieldBytes = new byte[] {0, 0, 0, 1, -122, -16, -116, -90, -32};
> GenericFixed genericFixed = new GenericData.Fixed(precombineFieldSchema, decimalFieldBytes);
> HoodieRecord avroRecord = new HoodieAvroRecord<>(new HoodieKey("recordKey", "partitionPath"),
> new EventTimeAvroPayload(null, (Comparable) genericFixed));
> records.put("a", avroRecord);
> records.get("a");
> }
> } {code}
>
> Stacktrace for the error:
> {code:java}
> com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
> Serialization trace:
> props (org.apache.avro.Schema$FixedSchema)
> schema (org.apache.avro.generic.GenericData$Fixed)
> orderingVal (org.apache.hudi.common.model.EventTimeAvroPayload)
> data (org.apache.hudi.common.model.HoodieAvroRecord) at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:543)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:813)
> at org.apache.hudi.common.util.SerializationUtils$KryoSerializerInstance.deserialize(SerializationUtils.java:106)
> at org.apache.hudi.common.util.SerializationUtils.deserialize(SerializationUtils.java:80)
> at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:210)
> at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:203)
> at org.apache.hudi.common.util.collection.BitCaskDiskMap.get(BitCaskDiskMap.java:199)
> at org.apache.hudi.sink.utils.TestBitCaskDiskMapFromFlink.testPutDecimal(TestBitCaskDiskMapFromFlink.java:51)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
> at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
> at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
> at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
> at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
> at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
> at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
> at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
> at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
> at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
> at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
> at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
> at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
> at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
> at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
> at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
> at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at java.util.ArrayList.forEach(ArrayList.java:1259)
> at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
> at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
> at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
> at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
> at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
> at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
> at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
> at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
> at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
> at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
> at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
> at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
> at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
> at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:75)
> at com.intellij.junit5.JUnit5IdeaTestRunner.startRunnerWithArgs(JUnit5IdeaTestRunner.java:57)
> at com.intellij.rt.junit.IdeaTestRunner$Repeater$1.execute(IdeaTestRunner.java:38)
> at com.intellij.rt.execution.junit.TestsRepeater.repeat(TestsRepeater.java:11)
> at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:35)
> at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:235)
> at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:54)
> Caused by: java.lang.NullPointerException
> at org.apache.avro.JsonProperties$2.putIfAbsent(JsonProperties.java:159)
> at org.apache.avro.JsonProperties$2.put(JsonProperties.java:166)
> at org.apache.avro.JsonProperties$2.put(JsonProperties.java:151)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:162)
> at com.esotericsoftware.kryo.serializers.MapSerializer.read(MapSerializer.java:39)
> at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:731)
> at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)
> ... 84 more {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)