You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2020/09/28 20:26:31 UTC

[nifi] branch main updated: NIFI-7843 Recursive avro schemas fail to write with RecordWriter

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f73a019  NIFI-7843 Recursive avro schemas fail to write with RecordWriter
f73a019 is described below

commit f73a019f36f0ef6ebc4dfc634a856fd92a36f3df
Author: Denes Arvay <de...@cloudera.com>
AuthorDate: Wed Sep 23 11:58:52 2020 +0200

    NIFI-7843 Recursive avro schemas fail to write with RecordWriter
    
    NIFI-7843 Recursive avro schemas fail to write with RecordWriter
    Add new test case to TestSimpleRecordSchema to test the scenario
    when schema name and schema namespace match.
    
    This closes #4550.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 .../nifi/serialization/SimpleRecordSchema.java     |   5 +++++
 .../nifi/serialization/TestSimpleRecordSchema.java |  20 ++++++++++++++++++++
 .../java/org/apache/nifi/avro/AvroTypeUtil.java    |   2 ++
 .../nifi-record-serialization-services/pom.xml     |   1 +
 .../org/apache/nifi/avro/TestWriteAvroResult.java  |  21 +++++++++++++++++++++
 .../src/test/resources/avro/recursive.avro         | Bin 0 -> 19 bytes
 .../src/test/resources/avro/recursive.avsc         |  10 ++++++++++
 7 files changed, 59 insertions(+)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
index 453d88a..9a48b02 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/SimpleRecordSchema.java
@@ -167,6 +167,11 @@ public class SimpleRecordSchema implements RecordSchema {
         }
 
         final RecordSchema other = (RecordSchema) obj;
+        if (getSchemaNamespace().isPresent() && getSchemaNamespace().equals(other.getSchemaNamespace())
+                && getSchemaName().isPresent() && getSchemaName().equals(other.getSchemaName())) {
+            return true;
+        }
+
         return fields.equals(other.getFields());
     }
 
diff --git a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
index 204cb67..c2b2d43 100644
--- a/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
+++ b/nifi-commons/nifi-record/src/test/java/org/apache/nifi/serialization/TestSimpleRecordSchema.java
@@ -22,8 +22,10 @@ import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.SchemaIdentifier;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -90,6 +92,24 @@ public class TestSimpleRecordSchema {
         assertTrue(secondSchema.equals(schema));
     }
 
+    @Test
+    public void testFieldsArentCheckedInEqualsIfNameAndNamespaceMatch() {
+        final RecordField testField = new RecordField("test", RecordFieldType.STRING.getDataType());
+
+        final SimpleRecordSchema schema1 = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
+        schema1.setSchemaName("name");
+        schema1.setSchemaNamespace("namespace");
+        schema1.setFields(Collections.singletonList(testField));
+
+        SimpleRecordSchema schema2 = Mockito.spy(new SimpleRecordSchema(SchemaIdentifier.EMPTY));
+        schema2.setSchemaName("name");
+        schema2.setSchemaNamespace("namespace");
+        schema2.setFields(Collections.singletonList(testField));
+
+        assertTrue(schema1.equals(schema2));
+        Mockito.verify(schema2, Mockito.never()).getFields();
+    }
+
     private Set<String> set(final String... values) {
         final Set<String> set = new HashSet<>();
         for (final String value : values) {
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
index f54d329..3919efa 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-avro-record-utils/src/main/java/org/apache/nifi/avro/AvroTypeUtil.java
@@ -378,6 +378,8 @@ public class AvroTypeUtil {
                     return knownRecordTypes.get(schemaFullName);
                 } else {
                     SimpleRecordSchema recordSchema = new SimpleRecordSchema(SchemaIdentifier.EMPTY);
+                    recordSchema.setSchemaName(avroSchema.getName());
+                    recordSchema.setSchemaNamespace(avroSchema.getNamespace());
                     DataType recordSchemaType = RecordFieldType.RECORD.getRecordDataType(recordSchema);
                     knownRecordTypes.put(schemaFullName, recordSchemaType);
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 1e99dba..3d01ea8 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -140,6 +140,7 @@
                         <exclude>src/test/resources/avro/logical-types-nullable.avsc</exclude>
                         <exclude>src/test/resources/avro/multiple-types.avsc</exclude>
                         <exclude>src/test/resources/avro/simple.avsc</exclude>
+                        <exclude>src/test/resources/avro/recursive.avsc</exclude>
                         <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
                         <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
                         <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
index a25e80e..afe5779 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/avro/TestWriteAvroResult.java
@@ -39,6 +39,7 @@ import org.junit.Test;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -74,6 +75,26 @@ public abstract class TestWriteAvroResult {
     }
 
     @Test
+    public void testWriteRecursiveRecord() throws IOException {
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/recursive.avsc"));
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        final FileInputStream in = new FileInputStream("src/test/resources/avro/recursive.avro");
+
+        try (final AvroRecordReader reader = new AvroReaderWithExplicitSchema(in, recordSchema, schema);
+                final RecordSetWriter writer = createWriter(schema, new ByteArrayOutputStream())) {
+
+            final GenericRecord avroRecord = reader.nextAvroRecord();
+            final Map<String, Object> recordMap = AvroTypeUtil.convertAvroRecordToMap(avroRecord, recordSchema);
+            final Record record = new MapRecord(recordSchema, recordMap);
+            try {
+                writer.write(record);
+            } catch (StackOverflowError soe) {
+                Assert.fail("Recursive schema resulted in infinite loop during write");
+            }
+        }
+    }
+
+    @Test
     public void testWriteRecord() throws IOException {
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/avro/simple.avsc"));
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro
new file mode 100644
index 0000000..320aeca
Binary files /dev/null and b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avro differ
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc
new file mode 100644
index 0000000..8c51be6
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/avro/recursive.avsc
@@ -0,0 +1,10 @@
+{
+  "namespace": "nifi",
+  "type": "record",
+  "name": "Recursive",
+  "fields": [
+    {"name": "id", "type": "int"},
+    {"name": "name",  "type": ["string", "null"]},
+    {"name": "parent",  "type": ["Recursive", "null"]}
+  ]
+}
\ No newline at end of file