You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/07/12 08:24:10 UTC
[nifi] 01/02: NIFI-6419: Fixed AvroWriter single record with
external schema results in data loss
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 24e50953a3e2dde4cda0261e4f7dba083d5ed602
Author: Peter Turcsanyi <tu...@cloudera.com>
AuthorDate: Thu Jul 4 18:48:36 2019 +0200
NIFI-6419: Fixed AvroWriter single record with external schema results in data loss
This closes #3573.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../nifi-record-serialization-services/pom.xml | 1 +
.../org/apache/nifi/avro/TestWriteAvroResult.java | 69 ++++++++++++++++++++++
.../nifi/avro/TestWriteAvroResultWithSchema.java | 18 +++++-
.../avro/TestWriteAvroResultWithoutSchema.java | 15 ++++-
.../src/test/resources/avro/simple.avsc | 8 +++
5 files changed, 109 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 06fdb6d..e32ec28 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -136,6 +136,7 @@
<exclude>src/test/resources/avro/logical-types.avsc</exclude>
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
+ <exclude>src/test/resources/avro/simple.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index 4751f74..d3e5f6c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -26,6 +26,7 @@ import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.SimpleRecordSchema;
import org.apache.nifi.serialization.WriteResult;
import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.ListRecordSet;
import org.apache.nifi.serialization.record.MapRecord;
import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
@@ -66,10 +67,78 @@ public abstract class TestWriteAvroResult {
protected abstract GenericRecord readRecord(InputStream in, Schema schema) throws IOException;
+ protected abstract List<GenericRecord> readRecords(InputStream in, Schema schema, int recordCount) throws IOException;
+
protected void verify(final WriteResult writeResult) {
}
@Test
+ public void testWriteRecord() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new HashMap<>();
+ values.put("msg", "nifi");
+ final Record record = new MapRecord(recordSchema, values);
+
+ try (final RecordSetWriter writer = createWriter(schema, baos)) {
+ writer.write(record);
+ }
+
+ final byte[] data = baos.toByteArray();
+
+ try (final InputStream in = new ByteArrayInputStream(data)) {
+ final GenericRecord avroRecord = readRecord(in, schema);
+
+ assertNotNull(avroRecord);
+ assertNotNull(avroRecord.get("msg"));
+ assertEquals("nifi", avroRecord.get("msg").toString());
+ }
+ }
+
+ @Test
+ public void testWriteRecordSet() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("msg", RecordFieldType.STRING.getDataType()));
+ final RecordSchema recordSchema = new SimpleRecordSchema(fields);
+
+ final int recordCount = 3;
+ List<Record> records = new ArrayList<>();
+ for (int i = 0; i < recordCount; i++){
+ final Map<String, Object> values = new HashMap<>();
+ values.put("msg", "nifi" + i);
+ final Record record = new MapRecord(recordSchema, values);
+ records.add(record);
+ }
+
+ try (final RecordSetWriter writer = createWriter(schema, baos)) {
+ writer.write(new ListRecordSet(recordSchema, records));
+ }
+
+ final byte[] data = baos.toByteArray();
+
+ try (final InputStream in = new ByteArrayInputStream(data)) {
+ final List<GenericRecord> avroRecords = readRecords(in, schema, recordCount);
+ for (int i = 0; i < recordCount; i++) {
+ final GenericRecord avroRecord = avroRecords.get(i);
+
+ assertNotNull(avroRecord);
+ assertNotNull(avroRecord.get("msg"));
+ assertEquals("nifi" + i, avroRecord.get("msg").toString());
+ }
+ }
+ }
+
+ @Test
public void testLogicalTypes() throws IOException, ParseException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/logical-types.avsc"));
testLogicalTypes(schema);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
index b3eecde..21787b4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithSchema.java
@@ -20,6 +20,8 @@ package org.apache.nifi.avro;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.file.CodecFactory;
@@ -39,11 +41,25 @@ public class TestWriteAvroResultWithSchema extends TestWriteAvroResult {
@Override
protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
- final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
final Schema avroSchema = dataFileStream.getSchema();
GenericData.setStringType(avroSchema, StringType.String);
final GenericRecord avroRecord = dataFileStream.next();
return avroRecord;
}
+
+ @Override
+ protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
+ final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(in, new GenericDatumReader<>());
+ final Schema avroSchema = dataFileStream.getSchema();
+ GenericData.setStringType(avroSchema, StringType.String);
+
+ List<GenericRecord> records = new ArrayList<>();
+ for (int i = 0; i < recordCount; i++) {
+ records.add(dataFileStream.next());
+ }
+
+ return records;
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
index c592df0..9d86df5 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResultWithoutSchema.java
@@ -65,11 +65,24 @@ public class TestWriteAvroResultWithoutSchema extends TestWriteAvroResult {
@Override
protected GenericRecord readRecord(final InputStream in, final Schema schema) throws IOException {
final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
- final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<GenericRecord>(schema);
+ final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
return reader.read(null, decoder);
}
@Override
+ protected List<GenericRecord> readRecords(final InputStream in, final Schema schema, final int recordCount) throws IOException {
+ final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in, null);
+ final GenericDatumReader<GenericRecord> reader = new GenericDatumReader<>(schema);
+
+ List<GenericRecord> records = new ArrayList<>();
+ for (int i = 0; i < recordCount; i++) {
+ records.add(reader.read(null, decoder));
+ }
+
+ return records;
+ }
+
+ @Override
protected void verify(final WriteResult writeResult) {
final Map<String, String> attributes = writeResult.getAttributes();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc
new file mode 100644
index 0000000..06f7aa0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/simple.avsc
@@ -0,0 +1,8 @@
+{
+ "namespace": "nifi",
+ "name": "simple",
+ "type": "record",
+ "fields": [
+ {"name": "msg", "type": "string"}
+ ]
+}