You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/09/28 20:26:31 UTC
[nifi] branch main updated: NIFI-7843 Recursive avro schemas fail
to write with RecordWriter
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f73a019 NIFI-7843 Recursive avro schemas fail to write with RecordWriter
f73a019 is described below
commit f73a019f36f0ef6ebc4dfc634a856fd92a36f3df
Author: Denes Arvay <de...@cloudera.com>
AuthorDate: Wed Sep 23 11:58:52 2020 +0200
NIFI-7843 Recursive avro schemas fail to write with RecordWriter
NIFI-7843 Recursive avro schemas fail to write with RecordWriter
Add new test case to TestSimpleRecordSchema to test the scenario
when schema name and schema namespace match.
This closes #4550.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
.../nifi/serialization/SimpleRecordSchema.java | 5 +++++
.../nifi/serialization/TestSimpleRecordSchema.java | 20 ++++++++++++++++++++
.../java/org/apache/nifi/avro/AvroTypeUtil.java | 2 ++
.../nifi-record-serialization-services/pom.xml | 1 +
.../org/apache/nifi/avro/TestWriteAvroResult.java | 21 +++++++++++++++++++++
.../src/test/resources/avro/recursive.avro | Bin 0 -> 19 bytes
.../src/test/resources/avro/recursive.avsc | 10 ++++++++++
7 files changed, 59 insertions(+)
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 453d88a..9a48b02 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -167,6 +167,11 @@ public class SimpleRecordSchema implements RecordSchema {
}
final RecordSchema other = (RecordSchema) obj;
+ if (getSchemaNamespace().isPresent() && getSchemaNamespace().equals(other.getSchemaNamespace())
+ && getSchemaName().isPresent() && getSchemaName().equals(other.getSchemaName())) {
+ return true;
+ }
+
return fields.equals(other.getFields());
}
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
index 204cb67..c2b2d43 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
@@ -22,8 +22,10 @@ import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.SchemaIdentifier;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -90,6 +92,24 @@ public class TestSimpleRecordSchema {
assertTrue(secondSchema.equals(schema));
}
+ @Test
+ public void testFieldsArentCheckedInEqualsIfNameAndNamespaceMatch() {
+ final RecordField testField = new RecordField("test", RecordFieldType.STRING.getDataType());
+
+ final SimpleRecordSchema schema1 = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
+ schema1.setSchemaName("name");
+ schema1.setSchemaNamespace("namespace");
+ schema1.setFields(Collections.singletonList(testField));
+
+ SimpleRecordSchema schema2 = Mockito.spy(new SimpleRecordSchema(SchemaIdentifier.EMPTY));
+ schema2.setSchemaName("name");
+ schema2.setSchemaNamespace("namespace");
+ schema2.setFields(Collections.singletonList(testField));
+
+ assertTrue(schema1.equals(schema2));
+ Mockito.verify(schema2, Mockito.never()).getFields();
+ }
+
private Set<String> set(final String... values) {
final Set<String> set = new HashSet<>();
for (final String value : values) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index f54d329..3919efa 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -378,6 +378,8 @@ public class AvroTypeUtil {
return knownRecordTypes.get(schemaFullName);
} else {
SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
+ recordSchema.setSchemaName(avroSchema.getName());
+ recordSchema.setSchemaNamespace(avroSchema.getNamespace());
DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
knownRecordTypes.put(schemaFullName, recordSchemaType);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 1e99dba..3d01ea8 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -140,6 +140,7 @@
<exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
<exclude>src/test/resources/avro/multiple-types.avsc</exclude>
<exclude>src/test/resources/avro/simple.avsc</exclude>
+ <exclude>src/test/resources/avro/recursive.avsc</exclude>
<exclude>src/test/resources/csv/extra-white-space.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
<exclude>src/test/resources/csv/single-bank-account.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index a25e80e..afe5779 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -39,6 +39,7 @@ import org.junit.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
@@ -74,6 +75,26 @@ public abstract class TestWriteAvroResult {
}
@Test
+ public void testWriteRecursiveRecord() throws IOException {
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/recursive.avsc"));
+ final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+ final FileInputStream in = new FileInputStream("src/test/resources/avro/recursive.avro");
+
+ try (final AvroRecordReader reader = new AvroReaderWithExplicitSchema(in, recordSchema, schema);
+ final RecordSetWriter writer = createWriter(schema, new ByteArrayOutputStream())) {
+
+ final GenericRecord avroRecord = reader.nextAvroRecord();
+ final Map<String, Object> recordMap = AvroTypeUtil.convertAvroRecordToMap(avroRecord, recordSchema);
+ final Record record = new MapRecord(recordSchema, recordMap);
+ try {
+ writer.write(record);
+ } catch (StackOverflowError soe) {
+ Assert.fail("Recursive schema resulted in infinite loop during write");
+ }
+ }
+ }
+
+ @Test
public void testWriteRecord() throws IOException {
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro
new file mode 100644
index 0000000..320aeca
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro differ
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc
new file mode 100644
index 0000000..8c51be6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc
@@ -0,0 +1,10 @@
+{
+ "namespace": "nifi",
+ "type": "record",
+ "name": "Recursive",
+ "fields": [
+ {"name": "id", "type": "int"},
+ {"name": "name", "type": ["string", "null"]},
+ {"name": "parent", "type": ["Recursive", "null"]}
+ ]
+}
\ No newline at end of file