You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2021/03/24 20:29:10 UTC

[GitHub] [nifi] tpalfy opened a new pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

tpalfy opened a new pull request #4934:
URL: https://github.com/apache/nifi/pull/4934


   Change the logic that selects the first  compatible schema which can have missing fields compared to the real value and search for a more strict match first and fallback to the existing logic only if not one found.
   
   ### For all changes:
   - [ ] Is there a JIRA ticket associated with this PR? Is it referenced 
        in the commit message?
   
   - [ ] Does your PR title start with **NIFI-XXXX** where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
   
   - [ ] Has your PR been rebased against the latest commit within the target branch (typically `main`)?
   
   - [ ] Is your initial contribution a single, squashed commit? _Additional commits in response to PR reviewer feedback should be made on this branch and pushed to allow change tracking. Do not `squash` or use `--force` when pushing to allow for clean monitoring of changes._
   
   ### For code changes:
   - [ ] Have you ensured that the full suite of tests is executed via `mvn -Pcontrib-check clean install` at the root `nifi` folder?
   - [ ] Have you written or updated unit tests to verify your changes?
   - [ ] Have you verified that the full build is successful on JDK 8?
   - [ ] Have you verified that the full build is successful on JDK 11?
   - [ ] If adding new dependencies to the code, are these dependencies licensed in a way that is compatible for inclusion under [ASF 2.0](http://www.apache.org/legal/resolved.html#category-a)? 
   - [ ] If applicable, have you updated the `LICENSE` file, including the main `LICENSE` file under `nifi-assembly`?
   - [ ] If applicable, have you updated the `NOTICE` file, including the main `NOTICE` file found under `nifi-assembly`?
   - [ ] If adding new Properties, have you added `.displayName` in addition to .name (programmatic access) for each of the new properties?
   
   ### For documentation related changes:
   - [ ] Have you ensured that format looks appropriate for the output in which it is rendered?
   
   ### Note:
   Please ensure that once the PR is submitted, you check GitHub Actions CI for build issues and submit an update to your PR as soon as possible.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 closed pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 closed pull request #4934:
URL: https://github.com/apache/nifi/pull/4934


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori commented on pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
pgyori commented on pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#issuecomment-810419344


   If you use the input data from the Jira ticket:
   {
     "dataCollection":[
       {
         "record": {
           "integer": 1,
           "boolean": true
         }
       },
       {
         "record": {
           "integer": 2,
           "string": "stringValue2"
         }
       }
     ]
   }
   
   and use ConvertRecord with JsonTreeReader and JsonRecordSetWriter, and configure the controller services the following way:
   JsonTreeReader:
    - Schema Access Strategy: Infer Schema
   JsonRecordSetWriter:
    - Schema Write Strategy: Set 'avro.schema' Attribute
    - Schema Access Strategy: Inherit Record Schema
    - Pretty Print JSON: true
   
   If you run the ConvertRecord processor with these settings, the schema that gets attached to the flowfile in the avro.schema attribute will not match the data, because the schema does not contain the schema of the second record (the one with "stringValue2").
   
   This way, if you copy the schema from the avro.schema attribute and set the Reader explicitly to use this schema (instead of inferring it) and run the ConvertRecord processor again (with the same input data), it will not read the "string": "stringValue2" part of the second record and the output of the processor will not contain this data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#discussion_r604177468



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
##########
@@ -176,6 +177,30 @@ protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataTy
             if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
                 final ArrayDataType arrayDataType = (ArrayDataType) dataType;
                 elementDataType = arrayDataType.getElementType();
+            } else if (dataType != null && dataType.getFieldType() == RecordFieldType.CHOICE) {
+                List<DataType> possibleSubTypes = ((ChoiceDataType)dataType).getPossibleSubTypes();
+
+                for (DataType possibleSubType : possibleSubTypes) {
+                    if (possibleSubType.getFieldType() == RecordFieldType.ARRAY) {
+                        ArrayDataType possibleArrayDataType = (ArrayDataType)possibleSubType;
+                        DataType possibleElementType = possibleArrayDataType.getElementType();
+
+                        final Object[] possibleArrayElements = new Object[numElements];
+                        int elementCounter = 0;
+                        for (final JsonNode node : arrayNode) {
+                            final Object value = getRawNodeValue(node, possibleElementType, fieldName);
+                            possibleArrayElements[elementCounter++] = value;
+                        }
+
+                        if (DataTypeUtils.isArrayTypeCompatible(possibleArrayElements, possibleElementType, true)) {
+                            return possibleArrayElements;
+                        }
+                    }
+                }
+
+                logger.warn("Couldn't find proper schema for '{}'. This could lead to data loss as fields might end up missing in the output!", fieldName);

Review comment:
       I don't think this is an appropriate warning. In this situation, the JSON has an array element. It is up to the caller of this method to determine what to do with the elements that are returned. If the elements are iterated over only by looking at the schema, then it is possible that these elements could be skipped - and that is intentional, not cause for a warning. It's also possible, as may be the case in ValidateRecord that the elements will not be skipped. This is the same as the behavior before this PR, and the same behavior that exists throughout most of the record handling. At most, I think this needs to be a DEBUG level log message.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#issuecomment-822625332


   Thanks @tpalfy for the fix and @pgyori for the review! Looks good at this point. JDK 1.8 (FR) and JDK 11 (EN) actions successfully completed. JDK 1.8 (JP) on mac os failed due to intermittent failure to pull a maven artifact. +1 merged to main.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 commented on pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#issuecomment-822538702


   Looks like the last commit addressed all review feedback from both @pgyori and me. The Github Actions almost all failed, and they had some rather odd error message basically just saying "Failed" so I kicked off the Actions again to make sure all looks good. Otherwise, the code looks good to me so I'm a +1 assuming that the Github Actions are satisfied.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] pgyori edited a comment on pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
pgyori edited a comment on pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#issuecomment-810419344


   If you use the input data from the Jira ticket:
   `{
     "dataCollection":[
       {
         "record": {
           "integer": 1,
           "boolean": true
         }
       },
       {
         "record": {
           "integer": 2,
           "string": "stringValue2"
         }
       }
     ]
   }`
   
   and use ConvertRecord with JsonTreeReader and JsonRecordSetWriter, and configure the controller services the following way:
   JsonTreeReader:
    - Schema Access Strategy: Infer Schema
   JsonRecordSetWriter:
    - Schema Write Strategy: Set 'avro.schema' Attribute
    - Schema Access Strategy: Inherit Record Schema
    - Pretty Print JSON: true
   
   If you run the ConvertRecord processor with these settings, the schema that gets attached to the flowfile in the avro.schema attribute will not match the data, because the schema does not contain the schema of the second record (the one with "stringValue2").
   
   This way, if you copy the schema from the avro.schema attribute and set the Reader explicitly to use this schema (instead of inferring it) and run the ConvertRecord processor again (with the same input data), it will not read the "string": "stringValue2" part of the second record and the output of the processor will not contain this data.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#discussion_r604111650



##########
File path: nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
##########
@@ -645,6 +650,14 @@ private static boolean isRecordTypeCompatible(RecordSchema schema, Object value)
             return true;
         }
 
+        if (strict) {
+            if (value instanceof MapRecord) {

Review comment:
       MapRecord is a specific implementation of Record. I think we should be checking if `instanceof Record` here, and then casting to `Record` in the next line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [nifi] markap14 commented on a change in pull request #4934: NIFI-8365 Fix JSON AbstractJsonRowRecordReader to handle deep CHOICE-typed records properly

Posted by GitBox <gi...@apache.org>.
markap14 commented on a change in pull request #4934:
URL: https://github.com/apache/nifi/pull/4934#discussion_r604190380



##########
File path: nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
##########
@@ -726,4 +732,383 @@ public void testIncorrectSchema() throws IOException, MalformedRecordException {
             assertTrue(msg.contains("Boolean"));
         }
     }
+
+    @Test
+    public void testMergeOfSimilarRecords() throws Exception {
+        // GIVEN
+        String jsonPath = "src/test/resources/json/similar-records.json";
+
+        RecordSchema expectedSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType()),
+            new RecordField("booleanOrString", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.BOOLEAN.getDataType(),
+                RecordFieldType.STRING.getDataType()
+            )),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+            new MapRecord(expectedSchema, new HashMap<String, Object>(){{
+                put("integer", 1);
+                put("boolean", true);
+                put("booleanOrString", true);
+            }}),
+            new MapRecord(expectedSchema, new HashMap<String, Object>(){{
+                put("integer", 2);
+                put("string", "stringValue2");
+                put("booleanOrString", "booleanOrStringValue2");
+            }})
+        );
+
+        // WHEN
+        // THEN
+        testReadRecords(jsonPath, expected);
+    }
+
+    @Test
+    public void testChoiceOfEmbeddedSimilarRecords() throws Exception {
+        // GIVEN
+        String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json";
+
+        SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
+            ))
+        ));
+
+        List<Object> expected = Arrays.asList(
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
+                    put("integer", 1);
+                    put("boolean", true);
+                }}));
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema2, new HashMap<String, Object>(){{
+                    put("integer", 2);
+                    put("string", "stringValue2");
+                }}));
+            }})
+        );
+
+        // WHEN
+        // THEN
+        testReadRecords(jsonPath, expected);
+    }
+
+    @Test
+    public void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
+        // GIVEN
+        String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
+
+        SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+            ))
+        ));
+
+        List<Object> expected = Arrays.asList(
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
+                    put("integer", 1);
+                }}));
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
+                        put("integer", 21);
+                        put("boolean", true);
+                    }}),
+                    new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
+                        put("integer", 22);
+                        put("boolean", false);
+                    }})
+                });
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{
+                    put("integer", 3);
+                    put("string", "stringValue3");
+                }}));
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
+                        put("integer", 41);
+                        put("string", "stringValue41");
+                    }}),
+                    new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
+                        put("integer", 42);
+                        put("string", "stringValue42");
+                    }})
+                });
+            }})
+        );
+
+        // WHEN
+        // THEN
+        testReadRecords(jsonPath, expected);
+    }
+
+    @Test
+    public void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
+        // GIVEN
+        String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
+
+        SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema3 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+        SimpleRecordSchema expectedRecordSchema4 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+            ))
+        ));
+
+        List<Object> expected = Arrays.asList(
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema1, new HashMap<String, Object>(){{
+                    put("integer", 1);
+                    put("boolean", false);
+                }}));
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
+                        put("integer", 21);
+                        put("boolean", true);
+                    }}),
+                    new MapRecord(expectedRecordSchema2, new HashMap<String, Object>() {{
+                        put("integer", 22);
+                        put("boolean", false);
+                    }})
+                });
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new MapRecord(expectedRecordSchema3, new HashMap<String, Object>(){{
+                    put("integer", 3);
+                    put("string", "stringValue3");
+                }}));
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
+                        put("integer", 41);
+                        put("string", "stringValue41");
+                    }}),
+                    new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
+                        put("integer", 42);
+                        put("string", "stringValue42");
+                    }}),
+                    new MapRecord(expectedRecordSchema4, new HashMap<String, Object>() {{
+                        put("integer", 43);
+                        put("boolean", false);
+                    }})
+                });
+            }})
+        );
+
+        // WHEN
+        // THEN
+        testReadRecords(jsonPath, expected);
+    }
+
+    @Test
+    public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
+        // GIVEN
+        String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
+
+        SimpleRecordSchema recordSchema1 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema recordSchema2 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+
+        RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
+            ))
+        ));
+
+        RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
+                RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
+            )
+        )));
+
+        SimpleRecordSchema expectedChildSchema1 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
+        ));
+        SimpleRecordSchema expectedChildSchema2 = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("integer", RecordFieldType.INT.getDataType()),
+            new RecordField("string", RecordFieldType.STRING.getDataType())
+        ));
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
+            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
+                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
+            ))
+        ));
+
+        // Since the actual arrays have records with either (INT, BOOLEAN, STRING) or (INT, STRING, STRING)
+        //  while the explicit schema defines only (INT, BOOLEAN) and (INT, STRING) we can't tell which record schema to chose
+        //  so we take the first one (INT, BOOLEAN) - as best effort - for both cases
+        SimpleRecordSchema expectedSelectedRecordSchemaForRecordsInBothArrays = expectedChildSchema1;
+
+        List<Object> expected = Arrays.asList(
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
+                        put("integer", 11);
+                        put("boolean", true);
+                        put("extraString", "extraStringValue11");
+                    }}),
+                    new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
+                        put("integer", 12);
+                        put("boolean", false);
+                        put("extraString", "extraStringValue12");
+                    }})
+                });
+            }}),
+            new MapRecord(expectedRecordChoiceSchema, new HashMap<String, Object>(){{
+                put("record", new Object[]{
+                    new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
+                        put("integer", 21);
+                        put("extraString", "extraStringValue21");
+                        put("string", "stringValue21");
+                    }}),
+                    new MapRecord(expectedSelectedRecordSchemaForRecordsInBothArrays, new HashMap<String, Object>() {{
+                        put("integer", 22);
+                        put("extraString", "extraStringValue22");
+                        put("string", "stringValue22");
+                    }})
+                });
+            }})
+        );
+
+        // WHEN
+        // THEN
+        testReadRecords(jsonPath, schema, expected);
+    }
+
+    private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
+        // GIVEN
+        final File jsonFile = new File(jsonPath);
+
+        try (
+            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
+        ) {
+            RecordSchema schema = inferSchema(jsonStream);
+
+            // WHEN
+            // THEN
+            testReadRecords(jsonStream, schema, expected);
+        }
+    }
+
+    private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
+        // GIVEN
+        final File jsonFile = new File(jsonPath);
+
+        try (
+            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
+        ) {
+            // WHEN
+            // THEN
+            testReadRecords(jsonStream, schema, expected);
+        }
+    }
+
+    private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
+        // GIVEN
+        try (
+            JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat);
+        ) {
+            // WHEN
+            List<Object> actual = new ArrayList<>();
+            Record record;
+            while ((record = reader.nextRecord()) != null) {
+                List<Object> dataCollection = Arrays.asList((Object[]) record.getValue("dataCollection"));
+                actual.addAll(dataCollection);
+            }
+
+            // THEN
+            List<Function<Object, Object>> propertyProviders = Arrays.asList(

Review comment:
       I find this logic very confusing, to have a List of functions that transform one object, chained together, and then to wrap an existing list with these transforms, just to compare them against the result of applying the same chain of transformations.
   
   It would be far simpler to just transform the objects here and then make the assertion against the transformed objects.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org