You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/11 22:58:00 UTC

[jira] [Commented] (NIFI-5213) Allow AvroReader with explicit schema to read files with embedded schema

    [ https://issues.apache.org/jira/browse/NIFI-5213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16508920#comment-16508920 ] 

ASF GitHub Bot commented on NIFI-5213:
--------------------------------------

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2718#discussion_r194572498
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java ---
    @@ -17,33 +17,61 @@
     
     package org.apache.nifi.avro;
     
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
     import java.io.EOFException;
     import java.io.IOException;
     import java.io.InputStream;
    +import java.io.SequenceInputStream;
     
     import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
     import org.apache.avro.generic.GenericDatumReader;
     import org.apache.avro.generic.GenericRecord;
     import org.apache.avro.io.BinaryDecoder;
     import org.apache.avro.io.DatumReader;
     import org.apache.avro.io.DecoderFactory;
    -import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.commons.io.input.TeeInputStream;
     import org.apache.nifi.serialization.MalformedRecordException;
     import org.apache.nifi.serialization.record.RecordSchema;
     
     public class AvroReaderWithExplicitSchema extends AvroRecordReader {
         private final InputStream in;
         private final RecordSchema recordSchema;
         private final DatumReader<GenericRecord> datumReader;
    -    private final BinaryDecoder decoder;
    +    private BinaryDecoder decoder;
         private GenericRecord genericRecord;
    +    private DataFileStream<GenericRecord> dataFileStream;
     
    -    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException {
    +    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException {
             this.in = in;
             this.recordSchema = recordSchema;
     
    -        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
    -        decoder = DecoderFactory.get().binaryDecoder(in, null);
    +        datumReader = new GenericDatumReader<>(avroSchema);
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
    +        // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream
    +        try {
    +            dataFileStream = new DataFileStream<>(teeInputStream, new GenericDatumReader<>());
    +        } catch (IOException ioe) {
    +            // Carry on, hopefully a raw Avro file
    +            // Need to be able to re-read the bytes read so far, and the InputStream passed in doesn't support reset. Use the TeeInputStream in
    +            // conjunction with SequenceInputStream to glue the two streams back together for future reading
    +            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    +            SequenceInputStream sis = new SequenceInputStream(bais, in);
    +            decoder = DecoderFactory.get().binaryDecoder(sis, null);
    +        }
    +        if (dataFileStream != null) {
    +            // Verify the schemas are the same
    +            Schema embeddedSchema = dataFileStream.getSchema();
    +            if (!embeddedSchema.equals(avroSchema)) {
    +                throw new IOException("Explicit schema does not match embedded schema");
    --- End diff --
    
    @mattyb149 How does it handle schema evolution in this case? It's possible that the Kafka producer has `Corporate Schema v1` and NiFi is configured with `Corporate Schema v2` and v2 gracefully allows an upgrade from v1 via Avro schema evolution rules. Or am I missing something about that being not really a thing WRT the Record API?


> Allow AvroReader with explicit schema to read files with embedded schema
> ------------------------------------------------------------------------
>
>                 Key: NIFI-5213
>                 URL: https://issues.apache.org/jira/browse/NIFI-5213
>             Project: Apache NiFi
>          Issue Type: Improvement
>          Components: Extensions
>            Reporter: Matt Burgess
>            Assignee: Matt Burgess
>            Priority: Minor
>
> AvroReader allows the choice of schema access strategy from such options as Use Embedded Schema, Use Schema Name, Use Schema Text, etc. If the incoming Avro files will have embedded schemas, then Use Embedded Schema is best practice for the Avro Reader. However it is not intuitive that if the same schema that is embedded in the file is specified by name (using a schema registry) or explicitly via Schema Text, that errors can occur. This has been noticed in QueryRecord for example, and the error is also not intuitive or descriptive (it is often an ArrayIndexOutOfBoundsException).
> To provide a better user experience, it would be an improvement for AvroReader to be able to successfully process Avro files with embedded schemas, even when the Schema Access Strategy is not "Use Embedded Schema". Of course, the explicit schema would have to match the embedded schema, or an error would be reported (and rightfully so).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)