You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by "exceptionfactory (via GitHub)" <gi...@apache.org> on 2023/09/18 20:58:40 UTC

[GitHub] [nifi] exceptionfactory commented on a diff in pull request #7665: NIFI-11197 Initial check in for Yaml record reader

exceptionfactory commented on code in PR #7665:
URL: https://github.com/apache/nifi/pull/7665#discussion_r1329269945


##########
nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonRecordSource.java:
##########
@@ -31,29 +29,23 @@
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
     private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
-    private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
     private final StartingFieldStrategy strategy;
-    private final String startingFieldName;
-
-    static {
-        jsonFactory = new JsonFactory();
-        jsonFactory.setCodec(new ObjectMapper());
-    }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
-        strategy = null;
-        startingFieldName = null;
+        this(in, null, null);
     }
 
     public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
-        jsonParser = jsonFactory.createParser(in);
+        this(in , strategy, startingFieldName, new JsonParserFactory());
+    }
+
+    public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName, TokenParserFactory tokenParserFactory) throws IOException {
+        jsonParser = tokenParserFactory.getJsonParser(in);
         this.strategy = strategy;
-        this.startingFieldName = startingFieldName;
 
-        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
-            final SerializedString serializedNestedField = new SerializedString(this.startingFieldName);
+        if (StartingFieldStrategy.NESTED_FIELD.equals(strategy)) {

Review Comment:
   The comparison of `enum` values should use `==` instead of `equals()`, although both will work.
   ```suggestion
           if (StartingFieldStrategy.NESTED_FIELD == strategy) {
   ```



##########
nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/yaml/TestYamlTreeRowRecordReader.java:
##########
@@ -0,0 +1,1362 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.yaml;
+
+import org.apache.avro.Schema;
+import org.apache.commons.io.FileUtils;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.json.JsonSchemaInference;
+import org.apache.nifi.json.JsonTreeRowRecordReader;
+import org.apache.nifi.json.SchemaApplicationStrategy;
+import org.apache.nifi.json.StartingFieldStrategy;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.inference.InferSchemaAccessStrategy;
+import org.apache.nifi.schema.inference.TimeValueInference;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.util.EqualsWrapper;
+import org.apache.nifi.util.MockComponentLog;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiPredicate;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+
+class TestYamlTreeRowRecordReader {
+    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
+    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
+    private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
+
+    private List<RecordField> getDefaultFields() {
+        return getFields(RecordFieldType.DOUBLE.getDataType());
+    }
+
+    private List<RecordField> getFields(final DataType balanceDataType) {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("balance", balanceDataType));
+        fields.add(new RecordField("address", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("city", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("state", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("zipCode", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("country", RecordFieldType.STRING.getDataType()));
+        return fields;
+    }
+
+    private RecordSchema getAccountSchema() {
+        final List<RecordField> accountFields = new ArrayList<>();
+        accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+        return new SimpleRecordSchema(accountFields);
+    }
+
+    @Test
+    void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
+        final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc");
+        final File jsonFile = new File("src/test/resources/yaml/choice-of-string-or-array-record.yaml");
+
+        final Schema avroSchema = new Schema.Parser().parse(schemaFile);
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+
+        try (final InputStream fis = new FileInputStream(jsonFile);
+            final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(fis, new MockComponentLog("id", "id"), recordSchema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record record = reader.nextRecord();
+            final Object[] fieldsArray = record.getAsArray("fields");
+            assertEquals(2, fieldsArray.length);
+
+            final Object firstElement = fieldsArray[0];
+            assertTrue(firstElement instanceof Record);
+            assertEquals("string", ((Record) firstElement).getAsString("type"));
+
+            final Object secondElement = fieldsArray[1];
+            assertTrue(secondElement instanceof Record);
+            final Object[] typeArray = ((Record) secondElement).getAsArray("type");
+            assertEquals(1, typeArray.length);
+
+            final Object firstType = typeArray[0];
+            assertTrue(firstType instanceof Record);
+            final Record firstTypeRecord = (Record) firstType;
+            assertEquals("string", firstTypeRecord.getAsString("type"));
+        }
+    }
+
+    @Test
+    void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
+        final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc"));
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/elements-for-record-choice.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
+
+            // evaluate first record
+            final Record firstRecord = reader.nextRecord();
+            assertNotNull(firstRecord);
+            final RecordSchema firstOuterSchema = firstRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), firstOuterSchema.getFieldNames());
+            assertEquals("1234", firstRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
+            assertSame(RecordFieldType.CHOICE, firstOuterSchema.getDataType("child").get().getFieldType());
+            final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, firstSubTypes.size());
+            assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "id" as the only field
+            final Object childObject = firstRecord.getValue("child");
+            assertTrue(childObject instanceof Record);
+            final Record firstChildRecord = (Record) childObject;
+            final RecordSchema firstChildSchema = firstChildRecord.getSchema();
+
+            assertEquals(Collections.singletonList("id"), firstChildSchema.getFieldNames());
+
+            // evaluate second record
+            final Record secondRecord = reader.nextRecord();
+            assertNotNull(secondRecord);
+
+            final RecordSchema secondOuterSchema = secondRecord.getSchema();
+            assertEquals(Arrays.asList("id", "child"), secondOuterSchema.getFieldNames());
+            assertEquals("1234", secondRecord.getValue("id"));
+
+            // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
+            assertSame(RecordFieldType.CHOICE, secondOuterSchema.getDataType("child").get().getFieldType());
+            final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
+            assertEquals(2, secondSubTypes.size());
+            assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
+
+            // child record should have a schema with "name" as the only field
+            final Object secondChildObject = secondRecord.getValue("child");
+            assertTrue(secondChildObject instanceof Record);
+            final Record secondChildRecord = (Record) secondChildObject;
+            final RecordSchema secondChildSchema = secondChildRecord.getSchema();
+
+            assertEquals(Collections.singletonList("name"), secondChildSchema.getFieldNames());
+
+            assertNull(reader.nextRecord());
+        }
+
+    }
+
+    @Test
+    void testReadArray() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("Not sure there is such a thing as one Yaml doc per line")
+    void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("TODO Determine whether this is possible in Yaml")
+    void testReadMultilineJSON() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", BigDecimal.valueOf(4750.89), "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", BigDecimal.valueOf(4820.09), "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("TODO Determine whether this is possible in Yaml as two arrays may end up as one array")
+    void testReadMultilineArrays() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    @Disabled("Not sure this makes sense in Yaml")
+    void testReadMixedJSON() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-mixed.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            //final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            //assertArrayEquals(new Object[] {3, "Maria Doe", 4750.89, "123 My Street", "My City", "ME", "11111", "USA"}, thirdRecordValues);
+
+            final Object[] fourthRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {4, "Xi Doe", 4820.09, "321 Your Street", "Your City", "NV", "33333", "USA"}, fourthRecordValues);
+
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord(true, true);
+            assertEquals(1, schemaValidatedRecord.getValue("id"));
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+        }
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id"));
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+        }
+    }
+
+    @Test
+    void testReadRawRecordFieldOrderPreserved() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        final String expectedMap = "{id=1, name=John Doe, address=123 My Street, city=My City, state=MS, zipCode=11111, country=USA, account=MapRecord[{id=42, balance=4750.89}]}";
+        final String expectedRecord = String.format("MapRecord[%s]", expectedMap);
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+
+            assertEquals(expectedRecord, rawRecord.toString());
+
+            final Map<String, Object> map = rawRecord.toMap();
+            assertEquals(expectedMap, map.toString());
+        }
+    }
+
+    @Test
+    void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record schemaValidatedRecord = reader.nextRecord(true, true);
+            assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
+            assertEquals("John Doe", schemaValidatedRecord.getValue("name"));
+            assertNull(schemaValidatedRecord.getValue("balance"));
+
+            assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
+        }
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Record rawRecord = reader.nextRecord(false, false);
+            assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
+            assertEquals("John Doe", rawRecord.getValue("name"));
+            assertEquals(4750.89, rawRecord.getValue("balance"));
+            assertEquals("123 My Street", rawRecord.getValue("address"));
+            assertEquals("My City", rawRecord.getValue("city"));
+            assertEquals("MS", rawRecord.getValue("state"));
+            assertEquals("11111", rawRecord.getValue("zipCode"));
+            assertEquals("USA", rawRecord.getValue("country"));
+
+            assertEquals(8, rawRecord.getRawFieldNames().size());
+        }
+    }
+
+    @Test
+    void testDateCoercedFromString() throws IOException, MalformedRecordException {
+        final String dateField = "date";
+        final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        final String date = "2000-01-01";
+        final String datePattern = "yyyy-MM-dd";
+        final String yaml = String.format("%s: %s", dateField, date);
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new ByteArrayInputStream(yaml.getBytes(StandardCharsets.UTF_8));
+                 final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, datePattern, timeFormat, timestampFormat)) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue(dateField);
+                assertTrue(value instanceof java.sql.Date, "With coerceTypes set to " + coerceTypes + ", value is not a Date");
+                assertEquals(date, value.toString());
+            }
+        }
+    }
+
+    @Test
+    void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new FileInputStream("src/test/resources/yaml/timestamp.yaml");
+                 final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue(value instanceof java.sql.Timestamp, "With coerceTypes set to " + coerceTypes + ", value is not a Timestamp");
+            }
+        }
+    }
+
+    @Test
+    void testSingleJsonElement() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
+        // Wraps default fields by Choice data type to test mapping to a Choice type.
+        final List<RecordField> choiceFields = getDefaultFields().stream()
+                .map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
+        final RecordSchema schema = new SimpleRecordSchema(choiceFields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-bank-account.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            final List<RecordField> fields = schema.getFields();
+            for (int i = 0; i < schema.getFields().size(); i++) {
+                assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType);
+                final ChoiceDataType choiceDataType = (ChoiceDataType) fields.get(i).getDataType();
+                assertEquals(expectedTypes.get(i), choiceDataType.getPossibleSubTypes().get(0).getFieldType());
+            }
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testElementWithNestedData() throws IOException, MalformedRecordException {
+        final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-element-nested.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            final Object[] allButLast = Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1);
+            assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", "My City", "MS", "11111", "USA"}, allButLast);
+
+            final Object last = firstRecordValues[firstRecordValues.length - 1];
+            assertTrue(Record.class.isAssignableFrom(last.getClass()));
+            final Record record = (Record) last;
+            assertEquals(42, record.getValue("id"));
+            assertEquals(4750.89, record.getValue("balance"));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testElementWithNestedArray() throws IOException, MalformedRecordException {
+        final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
+
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("accounts", accountsType));
+        fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-element-nested-array.yaml");
+             final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "address", "city", "state", "zipCode", "country", "accounts");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            final Object[] nonArrayValues = Arrays.copyOfRange(firstRecordValues, 0, firstRecordValues.length - 1);
+            assertArrayEquals(new Object[] {1, "John Doe", "123 My Street", "My City", "MS", "11111", "USA"}, nonArrayValues);
+
+            final Object lastRecord = firstRecordValues[firstRecordValues.length - 1];
+            assertTrue(Object[].class.isAssignableFrom(lastRecord.getClass()));
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array-different-schemas.yaml");
+             final JsonTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", null}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My Street", "My City", "MS", "11111", "USA"}, thirdRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
+        final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/bank-account-array-optional-balance.yaml");
+             final JsonTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final List<String> fieldNames = schema.getFieldNames();
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
+            assertEquals(expectedFieldNames, fieldNames);
+
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
+            assertEquals(expectedTypes, dataTypes);
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA"}, firstRecordValues);
+
+            final Object[] secondRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {2, "Jane Doe", null, "321 Your Street", "Your City", "NY", "33333", "USA"}, secondRecordValues);
+
+            final Object[] thirdRecordValues = reader.nextRecord().getValues();
+            assertArrayEquals(new Object[] {3, "Jimmy Doe", null, "321 Your Street", "Your City", "NY", "33333", "USA"}, thirdRecordValues);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+
+    @Test
+    void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
+
+        final List<RecordField> fromFields = new ArrayList<>();
+        fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+        fromFields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+        final RecordSchema fromSchema = new SimpleRecordSchema(fromFields);
+        final DataType fromType = RecordFieldType.RECORD.getRecordDataType(fromSchema);
+
+        final List<RecordField> fields = new ArrayList<>();
+        fields.add(new RecordField("created_at", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
+        fields.add(new RecordField("unicode", RecordFieldType.STRING.getDataType()));
+        fields.add(new RecordField("from", fromType));
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        try (final InputStream in = new FileInputStream("src/test/resources/yaml/yaml-with-unicode.yaml");
+             final JsonTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+            final Object[] firstRecordValues = reader.nextRecord().getValues();
+
+            final Object secondValue = firstRecordValues[1];
+            assertTrue(secondValue instanceof Long);
+            assertEquals(832036744985577473L, secondValue);
+
+            final Object unicodeValue = firstRecordValues[2];
+            assertEquals("\u3061\u3083\u6ce3\u304d\u305d\u3046", unicodeValue);
+
+            assertNull(reader.nextRecord());
+        }
+    }
+
+    @Test
+    void testIncorrectSchema() {
+        final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
+        final List<RecordField> fields = getDefaultFields();
+        fields.add(new RecordField("account", accountType));
+        fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
+
+        final RecordSchema schema = new SimpleRecordSchema(fields);
+
+        MalformedRecordException mre = assertThrows(MalformedRecordException.class, () -> {
+            try (final InputStream in = new FileInputStream("src/test/resources/yaml/single-bank-account-wrong-field-type.yaml");
+                 final YamlTreeRowRecordReader reader = new YamlTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+
+                reader.nextRecord().getValues();
+            }
+        });
+
+        final String msg = mre.getCause().getMessage();
+        assertTrue(msg.contains("account.balance"));
+        assertTrue(msg.contains("true"));
+        assertTrue(msg.contains("Double"));
+        assertTrue(msg.contains("Boolean"));
+    }
+
+    @Test
+    void testMergeOfSimilarRecords() throws Exception {
+        // GIVEN

Review Comment:
   The `GIVEN` and `WHEN/THEN` comments can be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org