You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@avro.apache.org by "Josh Highley (Jira)" <ji...@apache.org> on 2021/09/12 01:40:00 UTC

[jira] [Updated] (AVRO-3202) Avro Reader exception when data field not present, and schema has alias

     [ https://issues.apache.org/jira/browse/AVRO-3202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Josh Highley updated AVRO-3202:
-------------------------------
    Description: 
I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a field in the schema is not in the data, and the schema field has an alias. When the reader is searching for data fields in the FlowFile, it first tries the schema's field name and if not found then tries any aliases. The issue occurs when the field is not in the data. Prior to Avro 1.10.0, org/apache/avro/generic/GenericData$Record.get(String key) would just return null if the field name wasn't found in the schema. In 1.10, this was changed to throw an exception. It occurs when trying to get data using the alias name after first trying the field name (data does not contain a field with either name).

In my example, this is the schema field: 
{noformat}
{"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
stack trace (note field name in error message is the alias name 'PhoneNumber', not field name 'phone_number'

 
{noformat}
org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
         at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
         at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
         at org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown Source)
         at Script4c2db96c.run(Script4c2db96c.groovy:19)
         at org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
         at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
         at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
         ... 22 common frames omitted
 
{noformat}
 

Pertinent code:

[https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java]

 
{noformat}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
  Object value = avroRecord.get(recordField.getFieldName());
  if (value == null) {
     for (final String alias : recordField.getAliases()) {
        value = avroRecord.get(alias);  <<<<<< exception occurs <<<<<<
        if (value != null) { break; }
   }
 }
{noformat}
 

 

*GenericData$Record.get(String key):*

*1.10.0*

[https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]

 public Object get(String key) {
{noformat}
 Field field = schema.getField(key);
 if (field == null) {
 throw new AvroRuntimeException("Not a valid schema field: " + key);
 }
 return values[field.pos()];
 }{noformat}
 

 

*Avro 1.9.2*

[https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]

 
{noformat}
public Object get(String key) {
 Field field = schema.getField(key);
 if (field == null)
 return null;
 return values[field.pos()];
 }{noformat}
 

 

My ExecuteGroovyScript processor. the call to createRecordReader ultimately leads to the exception

 
{noformat}
import org.apache.nifi.serialization.record.Record
import org.apache.commons.io.IOUtils 

def flowFile = session.get()
if (!flowFile) {   log.warn("No flowFile")} 

InputStream inputStream
def reader 

try {   
  inputStream = session.read(flowFile)   
  reader = RecordReader.reader.createRecordReader(flowFile, inputStream, log){noformat}
 

  was:
I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a field in the schema is not in the data, and the schema field has an alias. When the reader is searching for data fields in the FlowFile, it first tries the schema's field name and if not found then tries any aliases. The issue occurs when the field is not in the data. Prior to Avro 1.10.0, org/apache/avro/generic/GenericData$Record.get(String key) would just return null if the field name wasn't found in the schema. In 1.10, this was changed to throw an exception. It occurs when trying to get data using the alias name after first trying the field name (data does not contain a field with either name).

In my example, this is the schema field: 
{noformat}
{"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
stack trace (note field name in error message is the alias name 'PhoneNumber', not field name 'phone_number'

 
{noformat}
org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
         at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
         at java.lang.reflect.Method.invoke(Method.java:498)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
         at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
         at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
         at org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown Source)
         at Script4c2db96c.run(Script4c2db96c.groovy:19)
         at org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
         at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
         at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
         at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
         at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
         at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
         at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
         at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
         at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
         at java.lang.Thread.run(Thread.java:748)
 Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
         at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
         at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
         at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
         ... 22 common frames omitted
 
{noformat}
 

Pertinent code:

https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java

 
{noformat}
public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
for (final RecordField recordField : recordSchema.getFields()) {
  Object value = avroRecord.get(recordField.getFieldName());
  if (value == null) {
     for (final String alias : recordField.getAliases()) {
        value = avroRecord.get(alias);  <<<<<< exception occurs <<<<<<
        if (value != null) { break; }
   }
 }
{noformat}
 

 

*GenericData$Record.get(String key):*

*1.10.0*

https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java

 public Object get(String key) {
{noformat}
 Field field = schema.getField(key);
 if (field == null) {
 throw new AvroRuntimeException("Not a valid schema field: " + key);
 }
 return values[field.pos()];
 }{noformat}

  

 

*Avro 1.9.2*

https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java

 
{noformat}
public Object get(String key) {
 Field field = schema.getField(key);
 if (field == null)
 return null;
 return values[field.pos()];
 }{noformat}
 

 

My GroovyScriptProcessor:

 
{noformat}
import org.apache.nifi.serialization.record.Record
import org.apache.commons.io.IOUtils 

def flowFile = session.get()
if (!flowFile) {   log.warn("No flowFile")} 

InputStream inputStream
def reader 

try {   
  inputStream = session.read(flowFile)   
  reader = RecordReader.reader.createRecordReader(flowFile, inputStream, log){noformat}
 

    Environment: Nifi 1.12.1   Avro 1.10.2  (was: Nifi 1.12.2   Avro 1.10.2)

> Avro Reader exception when data field not present, and schema has alias
> -----------------------------------------------------------------------
>
>                 Key: AVRO-3202
>                 URL: https://issues.apache.org/jira/browse/AVRO-3202
>             Project: Apache Avro
>          Issue Type: Bug
>         Environment: Nifi 1.12.1   Avro 1.10.2
>            Reporter: Josh Highley
>            Priority: Major
>
> I believe a change in Avro 1.10 has caused a bug in Nifi's Avro reader when a field in the schema is not in the data, and the schema field has an alias. When the reader is searching for data fields in the FlowFile, it first tries the schema's field name and if not found then tries any aliases. The issue occurs when the field is not in the data. Prior to Avro 1.10.0, org/apache/avro/generic/GenericData$Record.get(String key) would just return null if the field name wasn't found in the schema. In 1.10, this was changed to throw an exception. It occurs when trying to get data using the alias name after first trying the field name (data does not contain a field with either name).
> In my example, this is the schema field: 
> {noformat}
> {"name":"phone_number","type":["null","string"],"default":null,"aliases":["PhoneNumber"]}{noformat}
> stack trace (note field name in error message is the alias name 'PhoneNumber', not field name 'phone_number'
>  
> {noformat}
> org.apache.nifi.serialization.MalformedRecordException: Error while getting next record. Root cause: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
>          at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:52)
>          at sun.reflect.GeneratedMethodAccessor108.invoke(Unknown Source)
>          at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>          at java.lang.reflect.Method.invoke(Method.java:498)
>          at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:254)
>          at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler.access$100(StandardControllerServiceInvocationHandler.java:38)
>          at org.apache.nifi.controller.service.StandardControllerServiceInvocationHandler$ProxiedReturnObjectInvocationHandler.invoke(StandardControllerServiceInvocationHandler.java:240)
>          at com.sun.proxy.$Proxy231.nextRecord(Unknown Source)
>          at org.apache.nifi.serialization.RecordReader$nextRecord.call(Unknown Source)
>          at Script4c2db96c.run(Script4c2db96c.groovy:19)
>          at org.apache.nifi.processors.groovyx.ExecuteGroovyScript.onTrigger(ExecuteGroovyScript.java:472)
>          at org.apache.nifi.processor.AbstractProcessor.onTrigger(AbstractProcessor.java:27)
>          at org.apache.nifi.controller.StandardProcessorNode.onTrigger(StandardProcessorNode.java:1174)
>          at org.apache.nifi.controller.tasks.ConnectableTask.invoke(ConnectableTask.java:213)
>          at org.apache.nifi.controller.scheduling.TimerDrivenSchedulingAgent$1.run(TimerDrivenSchedulingAgent.java:117)
>          at org.apache.nifi.engine.FlowEngine$2.run(FlowEngine.java:110)
>          at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>          at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>          at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>          at java.lang.Thread.run(Thread.java:748)
>  Caused by: org.apache.avro.AvroRuntimeException: Not a valid schema field: PhoneNumber
>          at org.apache.avro.generic.GenericData$Record.get(GenericData.java:256)
>          at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:846)
>          at org.apache.nifi.avro.AvroTypeUtil.convertAvroRecordToMap(AvroTypeUtil.java:835)
>          at org.apache.nifi.avro.AvroRecordReader.nextRecord(AvroRecordReader.java:45)
>          ... 22 common frames omitted
>  
> {noformat}
>  
> Pertinent code:
> [https://github.com/apache/nifi/blob/master/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java]
>  
> {noformat}
> public static Map<String, Object> convertAvroRecordToMap(final GenericRecord avroRecord, final RecordSchema recordSchema, final Charset charset) {
> final Map<String, Object> values = new HashMap<>(recordSchema.getFieldCount());
> for (final RecordField recordField : recordSchema.getFields()) {
>   Object value = avroRecord.get(recordField.getFieldName());
>   if (value == null) {
>      for (final String alias : recordField.getAliases()) {
>         value = avroRecord.get(alias);  <<<<<< exception occurs <<<<<<
>         if (value != null) { break; }
>    }
>  }
> {noformat}
>  
>  
> *GenericData$Record.get(String key):*
> *1.10.0*
> [https://github.com/apache/avro/blob/release-1.10.0/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
>  public Object get(String key) {
> {noformat}
>  Field field = schema.getField(key);
>  if (field == null) {
>  throw new AvroRuntimeException("Not a valid schema field: " + key);
>  }
>  return values[field.pos()];
>  }{noformat}
>  
>  
> *Avro 1.9.2*
> [https://github.com/apache/avro/blob/release-1.9.2/lang/java/avro/src/main/java/org/apache/avro/generic/GenericData.java]
>  
> {noformat}
> public Object get(String key) {
>  Field field = schema.getField(key);
>  if (field == null)
>  return null;
>  return values[field.pos()];
>  }{noformat}
>  
>  
> My ExecuteGroovyScript processor. the call to createRecordReader ultimately leads to the exception
>  
> {noformat}
> import org.apache.nifi.serialization.record.Record
> import org.apache.commons.io.IOUtils 
> def flowFile = session.get()
> if (!flowFile) {   log.warn("No flowFile")} 
> InputStream inputStream
> def reader 
> try {   
>   inputStream = session.read(flowFile)   
>   reader = RecordReader.reader.createRecordReader(flowFile, inputStream, log){noformat}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)