You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "Matt Burgess (Jira)" <ji...@apache.org> on 2020/03/13 14:56:00 UTC

[jira] [Assigned] (NIFI-7249) [Regression] AvroReader: Could not parse incoming data

     [ https://issues.apache.org/jira/browse/NIFI-7249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Matt Burgess reassigned NIFI-7249:
----------------------------------

    Assignee: Matt Burgess

> [Regression] AvroReader: Could not parse incoming data
> ------------------------------------------------------
>
>                 Key: NIFI-7249
>                 URL: https://issues.apache.org/jira/browse/NIFI-7249
>             Project: Apache NiFi
>          Issue Type: Bug
>          Components: Extensions
>    Affects Versions: 1.10.0, 1.11.0, 1.11.1, 1.11.2, 1.11.3
>         Environment: Debian, Java 11 and Java 8
>            Reporter: Philipp Leufke
>            Assignee: Matt Burgess
>            Priority: Major
>         Attachments: AvroReader_bug_MWE.xml
>
>          Time Spent: 1h
>  Remaining Estimate: 0h
>
> *Assessment*
> {code:java}
> AvroTypeUtil.convertUnionFieldValue
> {code}
> has the following:
> {code:java}
>         Optional<Schema> mostSuitableType = DataTypeUtils.findMostSuitableType(
>                 originalValue,
>                 fieldSchema.getTypes().stream().filter(schema -> schema.getType() != Type.NULL).collect(Collectors.toList()),
>                 subSchema -> AvroTypeUtil.determineDataType(subSchema)
>         );
> {code}
> which in turn has the following:
> {code:java}
> DataType inferredDataType = inferDataType(value, null);
> {code}
> which in turn has the following:
> {code:java}
>         if (value instanceof Map) {
>             final Map<String, ?> map = (Map<String, ?>) value;
> {code}
> {{originalValue/value}} is a map extracted from an avro record that has {{Utf8}} keys instead of {{String}}.
> The issue in general however is the fact that we are dealing with an *avro-specific object* where previously *only NiFi-specific value objects were processed*.
> There are multiple approaches to fix this:
> # Consider this special case as a technical issue. We accept the fact that avro objects can leak into this layer and prepare it so it behaves as needed. I.e. transform the avro map to another where the keys are {{String}} objects.
> # Consider this an error-handling issue. Inference can be treated as a best-effort attempt and in case of an error we can fall back to the original logic. Inference was added here to be able to choose the best matching type from a UNION/CHOICE. If inference doesn't yield a result, the original logic goes over all types within the UNION/CHOICE and selects the _first compatible_ one. When a Map is in a UNION/CHOICE the other types will not pose compatibility issues so the original logic would work well.
> (1. and 2. are not mutually exclusive.)
> # Enhance inference logic so that the avro object is converted to a general object before inference occurs. This would eliminate the avro (or other third-party specific) objects being able to leak into the framework's format-agnostic layer.
> ----
> *Issue report*
>  Severe regression in Version 1.11.3, compared to 1.9.2:
> Record based processors cannot deserialize Avro messages any longer. Examples:
>  * ConsumeKafkaRecord: with embedded Avro schema or using Confluent Schema Registry
>  * ConvertRecord: with embedded Avro schema or using Confluent Schema Registry, too
>  * probably others as well...
> Error messages:
> {noformat}
> ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d]
> Failed to process StandardFlowFileRecord[uuid=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,claim=StandardContentClaim 
> [resourceClaim=StandardResourceClaim[id=1584002690648-1091, container=default, section=67], offset=276387, length=3487]
> ,offset=0,name=b3869d82-6c50-484e-8d0c-b64b5d5a3ac3,size=3487]; will route to failure:
> Could not parse incoming data{noformat}
> {noformat}
> ConsumeKafkaRecord_2_0[id=d9ebdbda-51b7-38ce-b43e-3197322bd2e1]
> Failed to parse message from Kafka using the configured Record Reader.
> Will route message as its own FlowFile to the 'parse.failure' relationship: org.apache.nifi.serialization.MalformedRecordException:
> Error while getting next record. Root cause: java.lang.ClassCastException
> {noformat}
>  
> However, the messages with enmbedded schema can flawlessly be converted to JSON using ConvertAvroToJson.
>  
> The behavior has been confirmed using various different flows and configurations with different Java versions. A downgrade to Nifi 1.9.2 resolves the issue, a subsequent upgrade to 1.11.3 brings it back.
>  
> Please find attached a minimal example template...
>  
> Stack traces:
>  
> {noformat}
> 2020-03-12 09:37:16,628 DEBUG [Timer-Driven Process Thread-4] org.apache.nifi.avro.AvroTypeUtil fail to convert field tags
> java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
>         at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
>         at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
>         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
>         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
>         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
>         at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>         at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
>         at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
>         at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
>         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>         at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>         at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)2020-03-12 09:37:16,632 ERROR [Timer-Driven Process Thread-4] o.a.n.processors.standard.ConvertRecord ConvertRecord[id=c3ed29c6-0170-1000-a960-809827e7654d] Failed to process StandardFlowFileRecord[uuid=33856f9d-1991-4c95-90c2-3ffd032fc840,claim=StandardContentClaim [resourceClaim=StandardResourceClaim[id=1584005835899-1, container=default, section=1], offset=851, length=3487],offset=0,name=33856f9d-1991-4c95-90c2-3ffd032fc840,size=3487]; will route to failure: org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data
> org.apache.nifi.processor.exception.ProcessException: Could not parse incoming data
>         at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:171)
>         at org.apache.nifi.controller.repository.StandardProcessSession.write(StandardProcessSession.java:3006)
>         at org.apache.nifi.processors.standard.AbstractRecordProcessor.onTrigger(AbstractRecordProcessor.java:122)
>         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1176)
>         at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>         at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>         at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
>         at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
>         at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>         at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>         at java.base/java.lang.Thread.run(Thread.java:834)
> Caused by: org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
>         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
>         at org.apache.nifi.serialization.RecordReader.nextRecord(RecordReader.java:50)
>         at org.apache.nifi.processors.standard.AbstractRecordProcessor$1.process(AbstractRecordProcessor.java:131)
>         ... 13 common frames omitted
> Caused by: java.lang.ClassCastException: class org.apache.avro.util.Utf8 cannot be cast to class java.lang.String (org.apache.avro.util.Utf8 is in unnamed module of loader org.apache.nifi.nar.NarClassLoader @515ebef3; java.lang.String is in module java.base of loader 'bootstrap')
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.inferRecordDataType(DataTypeUtils.java:544)
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.inferDataType(DataTypeUtils.java:478)
>         at org.apache.nifi.serialization.record.util.DataTypeUtils.findMostSuitableType(DataTypeUtils.java:267)
>         at org.apache.nifi.avro.AvroTypeUtil.convertUnionFieldValue(AvroTypeUtil.java:882)
>         at org.apache.nifi.avro.AvroTypeUtil.normalizeValue(AvroTypeUtil.java:1004)
>         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:857)
>         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:830)
>         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
>         ... 15 common frames omitted
> {noformat}
>  
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)