You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by "Sivaprasanna Sethuraman (Jira)" <ji...@apache.org> on 2020/05/10 11:10:00 UTC
[jira] [Commented] (FLINK-17486) ClassCastException when
checkpointing AVRO SpecificRecord with decimal fields
[ https://issues.apache.org/jira/browse/FLINK-17486?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17103754#comment-17103754 ]
Sivaprasanna Sethuraman commented on FLINK-17486:
-------------------------------------------------
This seems to be because when when we do `serializer.copy(s1)`, the logical type conversion informations are lost. I did a quick test which updates the AvroSerializer#copy() to something like:
{code:java}
// code placeholder@Override
public T copy(T from) {
if (CONCURRENT_ACCESS_CHECK) {
enterExclusiveThread();
}
try {
checkAvroInitialized();
if (from instanceof SpecificRecord) {
Collection<Conversion<?>> conversions = ((SpecificRecordBase) from).getSpecificData().getConversions();
conversions.forEach(avroData::addLogicalTypeConversion);
}
return avroData.deepCopy(runtimeSchema, from);
}
finally {
if (CONCURRENT_ACCESS_CHECK) {
exitExclusiveThread();
}
}
}
{code}
This fixes this bug and the user code runs as expected. However, the thing is `.getConversions()` are added to SpecificRecordBases from Avro 1.9.x so I updated the avro version to 1.9.2 from 1.8.2 which causes a lot of other problems in test cases. I have updated quite a lot of them but they seem to fail in AvroSerializerMigrationTest with InvalidClassException.
I have attached the patch. The link to the changes in my forked repo: [https://github.com/zenfenan/flink/tree/FLINK-17486]
> ClassCastException when checkpointing AVRO SpecificRecord with decimal fields
> -----------------------------------------------------------------------------
>
> Key: FLINK-17486
> URL: https://issues.apache.org/jira/browse/FLINK-17486
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.10.0
> Environment: Flink 1.10.0
> AVRO 1.9.2
> Java 1.8.0 (but also Java 14)
> Scala binary 2.11
> Reporter: Lorenzo Nicora
> Priority: Critical
> Labels: AVRO, confluent-kafka, kafka
>
> When consuming from a Kafka source AVRO SpecificRecord containing a {{decimal}} (logical type) field, copying the record fails with:
> {{java.lang.ClassCastException: class java.math.BigDecimal cannot be cast to class java.nio.ByteBuffer}}
> I understand the problem arises when Flink tries to make a deep-copy of the record for checkpointing.
> This code reproduces the problem ([https://github.com/nicusX/flink-avro-bug/blob/master/src/test/java/example/TestDeepCopy.java]):
>
> {code:java}
> AvroSerializer<Sample> serializer = new AvroSerializer<>(Sample.class);
> Sample s1 = Sample.newBuilder()
> .setPrice(BigDecimal.valueOf(42.32))
> .setId("A12345")
> .build();
> Sample s2 = serializer.copy(s1);
> {code}
>
>
> The AVRO SpecificRecord is generated from this IDL (using the maven-avro-plugin):
> {code:java}
> @namespace("example.avro")
> protocol SampleProtocol {
> record Sample{
> string id;
> decimal(9,2) price;
> timestamp_ms eventTime;
> }
> }{code}
> In particular, I had the problem after attaching an AssignerWithPeriodicWatermark to a Kafka Source consuming AVRO SpecificRecord and using Confluent Schema Registry. The assigned extracts the event time from the record and enabling bookmarking (not sure whether this is related).
> A simplified version of the application is here: [https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>
> The problem looks similar to AVRO-1895 but that issue has been fixed since AVRO 1.8.2.
> In fact, the following code does deep-copy only relying on AVRO and does work:
> {code:java}
> Sample s1 = Sample.newBuilder()
> .setPrice(BigDecimal.valueOf(42.32))
> .setId("A12345")
> .build();
> Sample s2 = Sample.newBuilder(s1).build();{code}
>
> Code of the two tests and simplified application: [https://github.com/nicusX/flink-avro-bug|https://github.com/nicusX/flink-avro-bug/blob/master/src/main/java/example/StreamJob.java]
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)