You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/07/12 08:24:10 UTC

[nifi] 01/02: NIFI-6419: Fixed AvroWriter single record with external schema results in data loss

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 24e50953a3e2dde4cda0261e4f7dba083d5ed602
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Thu Jul 4 18:48:36 2019 +0200

    NIFI-6419: Fixed AvroWriter single record with external schema results in data loss
    
    This closes #3573.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi-record-serialization-services/pom.xml     |  1 +
 .../org/apache/nifi/avro/TestWriteAvroResult.java  | 69 ++++++++++++++++++++++
 .../nifi/avro/TestWriteAvroResultWithSchema.java   | 18 +++++-
 .../avro/TestWriteAvroResultWithoutSchema.java     | 15 ++++-
 .../src/test/resources/avro/simple.avsc            |  8 +++
 5 files changed, 109 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 06fdb6d..e32ec28 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -136,6 +136,7 @@
                         <exclude>src/test/resources/avro/logical-types.avsc</exclude>
                         <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
+                        <exclude>src/test/resources/avro/simple.avsc</exclude>
                         <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 4751f74..d3e5f6c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.ListRecordSet;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
@@ -66,10 +67,78 @@ public abstract class TestWriteAvroResult {
 
     protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException;
 
+    protected abstract List<GenericRecord> readRecords(InputStream in, Schema schema, int recordCount) throws IOException;
+
     protected void verify(final WriteResult writeResult) {
     }
 
     @Test
+    public void testWriteRecord() throws IOException {
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final Map<String, Object> values = new HashMap<>();
+        values.put("msg", "nifi");
+        final Record record = new MapRecord(recordSchema, values);
+
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writer.write(record);
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final GenericRecord avroRecord = readRecord(in, schema);
+
+            assertNotNull(avroRecord);
+            assertNotNull(avroRecord.get("msg"));
+            assertEquals("nifi", avroRecord.get("msg").toString());
+        }
+    }
+
+    @Test
+    public void testWriteRecordSet() throws IOException {
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
+
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+        final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+        final int recordCount = 3;
+        List<Record> records = new ArrayList<>();
+        for (int i = 0; i < recordCount; i++){
+            final Map<String, Object> values = new HashMap<>();
+            values.put("msg", "nifi" + i);
+            final Record record = new MapRecord(recordSchema, values);
+            records.add(record);
+        }
+
+        try (final RecordSetWriter writer = createWriter(schema, baos)) {
+            writer.write(new ListRecordSet(recordSchema, records));
+        }
+
+        final byte[] data = baos.toByteArray();
+
+        try (final InputStream in = new ByteArrayInputStream(data)) {
+            final List<GenericRecord> avroRecords = readRecords(in, schema, recordCount);
+            for (int i = 0; i < recordCount; i++) {
+                final GenericRecord avroRecord = avroRecords.get(i);
+
+                assertNotNull(avroRecord);
+                assertNotNull(avroRecord.get("msg"));
+                assertEquals("nifi" + i, avroRecord.get("msg").toString());
+            }
+        }
+    }
+
+    @Test
     public void testLogicalTypes() throws IOException, ParseException {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
         testLogicalTypes(schema);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
index b3eecde..21787b4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
@@ -20,6 +20,8 @@ package org.apache.nifi.avro;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
@@ -39,11 +41,25 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult {
 
     @Override
     protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
-        final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+        final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
         final Schema avroSchema = dataFileStream.getSchema();
         GenericData.setStringType(avroSchema, StringType.String);
         final GenericRecord avroRecord = dataFileStream.next();
 
         return avroRecord;
     }
+
+    @Override
+    protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
+        final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
+        final Schema avroSchema = dataFileStream.getSchema();
+        GenericData.setStringType(avroSchema, StringType.String);
+
+        List<GenericRecord> records = new ArrayList<>();
+        for (int i = 0; i < recordCount; i++) {
+            records.add(dataFileStream.next());
+        }
+
+        return records;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index c592df0..9d86df5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -65,11 +65,24 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
     @Override
     protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
         final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
-        final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+        final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
         return reader.read(null, decoder);
     }
 
     @Override
+    protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
+        final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+        final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+
+        List<GenericRecord> records = new ArrayList<>();
+        for (int i = 0; i < recordCount; i++) {
+            records.add(reader.read(null, decoder));
+        }
+
+        return records;
+    }
+
+    @Override
     protected void verify(final WriteResult writeResult) {
         final Map<String, String> attributes = writeResult.getAttributes();
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc
new file mode 100644
index 0000000..06f7aa0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc
@@ -0,0 +1,8 @@
+{
+  "namespace": "nifi",
+  "name": "simple",
+  "type": "record",
+  "fields": [
+    {"name": "msg", "type": "string"}
+  ]
+}