You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by MikeThomsen <gi...@git.apache.org> on 2018/06/11 22:58:12 UTC

[GitHub] nifi pull request #2718: NIFI-5213: Allow AvroReader to process files w embe...

Github user MikeThomsen commented on a diff in the pull request:

    https://github.com/apache/nifi/pull/2718#discussion_r194572498
  
    --- Diff: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java ---
    @@ -17,33 +17,61 @@
     
     package org.apache.nifi.avro;
     
    +import java.io.ByteArrayInputStream;
    +import java.io.ByteArrayOutputStream;
     import java.io.EOFException;
     import java.io.IOException;
     import java.io.InputStream;
    +import java.io.SequenceInputStream;
     
     import org.apache.avro.Schema;
    +import org.apache.avro.file.DataFileStream;
     import org.apache.avro.generic.GenericDatumReader;
     import org.apache.avro.generic.GenericRecord;
     import org.apache.avro.io.BinaryDecoder;
     import org.apache.avro.io.DatumReader;
     import org.apache.avro.io.DecoderFactory;
    -import org.apache.nifi.schema.access.SchemaNotFoundException;
    +import org.apache.commons.io.input.TeeInputStream;
     import org.apache.nifi.serialization.MalformedRecordException;
     import org.apache.nifi.serialization.record.RecordSchema;
     
     public class AvroReaderWithExplicitSchema extends AvroRecordReader {
         private final InputStream in;
         private final RecordSchema recordSchema;
         private final DatumReader<GenericRecord> datumReader;
    -    private final BinaryDecoder decoder;
    +    private BinaryDecoder decoder;
         private GenericRecord genericRecord;
    +    private DataFileStream<GenericRecord> dataFileStream;
     
    -    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException {
    +    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException {
             this.in = in;
             this.recordSchema = recordSchema;
     
    -        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
    -        decoder = DecoderFactory.get().binaryDecoder(in, null);
    +        datumReader = new GenericDatumReader<>(avroSchema);
    +        ByteArrayOutputStream baos = new ByteArrayOutputStream();
    +        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
    +        // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream
    +        try {
    +            dataFileStream = new DataFileStream<>(teeInputStream, new GenericDatumReader<>());
    +        } catch (IOException ioe) {
    +            // Carry on, hopefully a raw Avro file
    +            // Need to be able to re-read the bytes read so far, and the InputStream passed in doesn't support reset. Use the TeeInputStream in
    +            // conjunction with SequenceInputStream to glue the two streams back together for future reading
    +            ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
    +            SequenceInputStream sis = new SequenceInputStream(bais, in);
    +            decoder = DecoderFactory.get().binaryDecoder(sis, null);
    +        }
    +        if (dataFileStream != null) {
    +            // Verify the schemas are the same
    +            Schema embeddedSchema = dataFileStream.getSchema();
    +            if (!embeddedSchema.equals(avroSchema)) {
    +                throw new IOException("Explicit schema does not match embedded schema");
    --- End diff --
    
    @mattyb149 How does it handle schema evolution in this case? It's possible that the Kafka producer has `Corporate Schema v1` and NiFi is configured with `Corporate Schema v2` and v2 gracefully allows an upgrade from v1 via Avro schema evolution rules. Or am I missing something about that being not really a thing WRT the Record API?


---