You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/03/01 22:07:38 UTC

[GitHub] AaronLeon commented on a change in pull request #2718: NIFI-5213: Allow AvroReader to process files w embedded schema even when the access strategy is explicit schema

AaronLeon commented on a change in pull request #2718: NIFI-5213: Allow AvroReader to process files w embedded schema even when the access strategy is explicit schema
URL: https://github.com/apache/nifi/pull/2718#discussion_r261772316
 
 

 ##########
 File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReaderWithExplicitSchema.java
 ##########
 @@ -17,33 +17,61 @@
 
 package org.apache.nifi.avro;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
+import java.io.SequenceInputStream;
 
 import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.BinaryDecoder;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.DecoderFactory;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.commons.io.input.TeeInputStream;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.record.RecordSchema;
 
 public class AvroReaderWithExplicitSchema extends AvroRecordReader {
     private final InputStream in;
     private final RecordSchema recordSchema;
     private final DatumReader<GenericRecord> datumReader;
-    private final BinaryDecoder decoder;
+    private BinaryDecoder decoder;
     private GenericRecord genericRecord;
+    private DataFileStream<GenericRecord> dataFileStream;
 
-    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException, SchemaNotFoundException {
+    public AvroReaderWithExplicitSchema(final InputStream in, final RecordSchema recordSchema, final Schema avroSchema) throws IOException {
         this.in = in;
         this.recordSchema = recordSchema;
 
-        datumReader = new GenericDatumReader<GenericRecord>(avroSchema);
-        decoder = DecoderFactory.get().binaryDecoder(in, null);
+        datumReader = new GenericDatumReader<>(avroSchema);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        TeeInputStream teeInputStream = new TeeInputStream(in, baos);
+        // Try to parse as a DataFileStream, if it works, glue the streams back together and delegate calls to the DataFileStream
 
 Review comment:
   It might be of interest to reverse the try/catch block. I understand the intuition to determine first whether the schema is embedded since that's the order in which the bytes are encoded, but there is an embedded Avro schema strategy for a reason. 
   
   From my understanding, a user who chooses to use AvroReaderWithExplicitSchema over AvroReaderWithEmbeddedSchema  is expecting to use an Explicit Schema to decode Avro data without an embedded schema and the exceptional case is that the data contains an embedded schema. The current implementation forces that user's Flow to propagate an exception each time, which may be expensive. Maybe it's negligible with sufficient batching of records, but just a nit-picking thought. The TeeInputStream construct is pretty neat though :)

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services