You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/04/14 21:04:34 UTC

[nifi] branch main updated: NIFI-9862 Updated JsonTreeReader to read Records from Nested Field

This is an automated email from the ASF dual-hosted git repository.

exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7f95de27fb NIFI-9862 Updated JsonTreeReader to read Records from Nested Field
7f95de27fb is described below

commit 7f95de27fb5f8ead755133ef1f3d74777c6c8578
Author: Lehel <le...@hotmail.com>
AuthorDate: Wed Apr 6 21:50:22 2022 +0200

    NIFI-9862 Updated JsonTreeReader to read Records from Nested Field
    
    This closes #5937
    
    Signed-off-by: David Handermann <ex...@apache.org>
---
 .../nifi-record-serialization-services/pom.xml     |   3 +
 .../nifi/json/AbstractJsonRowRecordReader.java     |  98 ++++-
 .../org/apache/nifi/json/JsonRecordSource.java     |  26 +-
 .../java/org/apache/nifi/json/JsonTreeReader.java  |  59 ++-
 .../apache/nifi/json/JsonTreeRowRecordReader.java  |  12 +-
 .../apache/nifi/json/StartingFieldStrategy.java    |  38 ++
 .../additionalDetails.html                         | 398 ++++++++++-------
 .../json/TestInferJsonSchemaAccessStrategy.java    | 107 +++--
 .../nifi/json/TestJsonPathRowRecordReader.java     |  74 ++--
 .../apache/nifi/json/TestJsonSchemaInference.java  |   4 +-
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 490 +++++++++++++--------
 .../org/apache/nifi/json/TestWriteJsonResult.java  |  38 +-
 .../test/resources/json/multiple-nested-field.json |  22 +
 .../json/nested-array-then-start-object.json       |  20 +
 .../json/single-element-nested-array-middle.json   |  16 +
 15 files changed, 932 insertions(+), 473 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 8bbe871734..cf43edc51c 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -206,11 +206,14 @@
                         <exclude>src/test/resources/json/data-types.json</exclude>
                         <exclude>src/test/resources/json/timestamp.json</exclude>
                         <exclude>src/test/resources/json/json-with-unicode.json</exclude>
+                        <exclude>src/test/resources/json/multiple-nested-field.json</exclude>
                         <exclude>src/test/resources/json/primitive-type-array.json</exclude>
                         <exclude>src/test/resources/json/single-bank-account.json</exclude>
                         <exclude>src/test/resources/json/single-bank-account-wrong-field-type.json</exclude>
                         <exclude>src/test/resources/json/single-element-nested-array.json</exclude>
                         <exclude>src/test/resources/json/single-element-nested.json</exclude>
+                        <exclude>src/test/resources/json/single-element-nested-array-middle.json</exclude>
+                        <exclude>src/test/resources/json/nested-array-then-start-object.json</exclude>
                         <exclude>src/test/resources/json/output/dataTypes.json</exclude>
                         <exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
                         <exclude>src/test/resources/json/record-choice.avsc</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 51d9533992..e248bfd518 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -51,8 +52,6 @@ import java.util.function.Supplier;
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
     private final ComponentLog logger;
-    private final JsonParser jsonParser;
-    private final JsonNode firstJsonNode;
     private final Supplier<DateFormat> LAZY_DATE_FORMAT;
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
@@ -61,11 +60,12 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
 
     private static final JsonFactory jsonFactory = new JsonFactory();
     private static final ObjectMapper codec = new ObjectMapper();
+    private JsonParser jsonParser;
+    private JsonNode firstJsonNode;
+    private StartingFieldStrategy strategy;
 
 
-    public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
-            throws IOException, MalformedRecordException {
-
+    private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) {
         this.logger = logger;
 
         final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
@@ -75,9 +75,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         LAZY_DATE_FORMAT = () -> df;
         LAZY_TIME_FORMAT = () -> tf;
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
+    }
+
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
+            throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
 
         try {
-            jsonParser = jsonFactory.createJsonParser(in);
+            jsonParser = jsonFactory.createParser(in);
             jsonParser.setCodec(codec);
 
             JsonToken token = jsonParser.nextToken();
@@ -95,6 +101,37 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
     }
 
+    protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+                                          final StartingFieldStrategy strategy, final String nestedFieldName) throws IOException, MalformedRecordException {
+
+        this(logger, dateFormat, timeFormat, timestampFormat);
+
+        this.strategy = strategy;
+
+        try {
+            jsonParser = jsonFactory.createParser(in);
+            jsonParser.setCodec(codec);
+
+            if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+                final SerializedString serializedStartingFieldName = new SerializedString(nestedFieldName);
+                while (!jsonParser.nextFieldName(serializedStartingFieldName) && jsonParser.hasCurrentToken());
+                logger.debug("Parsing starting at nested field [{}]", nestedFieldName);
+            }
+
+            JsonToken token = jsonParser.nextToken();
+            if (token == JsonToken.START_ARRAY) {
+                token = jsonParser.nextToken(); // advance to START_OBJECT token
+            }
+            if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
+                firstJsonNode = jsonParser.readValueAsTree();
+            } else {
+                firstJsonNode = null;
+            }
+        } catch (final JsonParseException e) {
+            throw new MalformedRecordException("Could not parse data as JSON", e);
+        }
+    }
+
     protected Supplier<DateFormat> getLazyDateFormat() {
         return LAZY_DATE_FORMAT;
     }
@@ -121,7 +158,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         } catch (final MalformedRecordException mre) {
             throw mre;
         } catch (final Exception e) {
-            logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", new Object[] {nextNode, schema, e.toString(), e});
+            logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", nextNode, schema, e.toString(), e);
             throw new MalformedRecordException("Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema", e);
         }
     }
@@ -178,11 +215,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
                 final ArrayDataType arrayDataType = (ArrayDataType) dataType;
                 elementDataType = arrayDataType.getElementType();
             } else if (dataType != null && dataType.getFieldType() == RecordFieldType.CHOICE) {
-                List<DataType> possibleSubTypes = ((ChoiceDataType)dataType).getPossibleSubTypes();
+                List<DataType> possibleSubTypes = ((ChoiceDataType) dataType).getPossibleSubTypes();
 
                 for (DataType possibleSubType : possibleSubTypes) {
                     if (possibleSubType.getFieldType() == RecordFieldType.ARRAY) {
-                        ArrayDataType possibleArrayDataType = (ArrayDataType)possibleSubType;
+                        ArrayDataType possibleArrayDataType = (ArrayDataType) possibleSubType;
                         DataType possibleElementType = possibleArrayDataType.getElementType();
 
                         final Object[] possibleArrayElements = new Object[numElements];
@@ -214,7 +251,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
 
         if (fieldNode.isObject()) {
-            RecordSchema childSchema = null;
             if (dataType != null && RecordFieldType.MAP == dataType.getFieldType()) {
                 return getMapFromRawValue(fieldNode, dataType, fieldName);
             }
@@ -291,8 +327,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         } else if (dataType.getFieldType() == RecordFieldType.ARRAY) {
             final ArrayDataType arrayDataType = (ArrayDataType) dataType;
             final DataType elementType = arrayDataType.getElementType();
-            final Record record = createOptionalRecord(fieldNode, elementType, strict);
-            return record;
+            return createOptionalRecord(fieldNode, elementType, strict);
         }
 
         return null;
@@ -309,8 +344,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
             childValues.put(childFieldName, childValue);
         }
 
-        final MapRecord record = new MapRecord(childSchema, childValues);
-        return record;
+        return new MapRecord(childSchema, childValues);
     }
 
     protected JsonNode getNextJsonNode() throws IOException, MalformedRecordException {
@@ -318,7 +352,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
             firstObjectConsumed = true;
             return firstJsonNode;
         }
+        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+            return getJsonNodeWithNestedNodeStrategy();
+        } else {
+            return getJsonNode();
+        }
 
+    }
+
+    private JsonNode getJsonNodeWithNestedNodeStrategy() throws IOException, MalformedRecordException {
         while (true) {
             final JsonToken token = jsonParser.nextToken();
             if (token == null) {
@@ -326,14 +368,34 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
             }
 
             switch (token) {
+                case START_ARRAY:
+                    break;
+                case END_ARRAY:
                 case END_OBJECT:
-                    continue;
+                case FIELD_NAME:
+                    return null;
                 case START_OBJECT:
                     return jsonParser.readValueAsTree();
-                case END_ARRAY:
-                case START_ARRAY:
-                    continue;
+                default:
+                    throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
+            }
+        }
+    }
+
+    private JsonNode getJsonNode() throws IOException, MalformedRecordException {
+        while (true) {
+            final JsonToken token = jsonParser.nextToken();
+            if (token == null) {
+                return null;
+            }
 
+            switch (token) {
+                case START_ARRAY:
+                case END_ARRAY:
+                case END_OBJECT:
+                    break;
+                case START_OBJECT:
+                    return jsonParser.readValueAsTree();
                 default:
                     throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
             }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
index 1c6afce9d4..348c2ef02f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
@@ -19,16 +19,22 @@ package org.apache.nifi.json;
 import com.fasterxml.jackson.core.JsonFactory;
 import com.fasterxml.jackson.core.JsonParser;
 import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.InputStream;
 
 public class JsonRecordSource implements RecordSource<JsonNode> {
+    private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
     private static final JsonFactory jsonFactory;
     private final JsonParser jsonParser;
+    private final StartingFieldStrategy strategy;
+    private final String startingFieldName;
 
     static {
         jsonFactory = new JsonFactory();
@@ -36,7 +42,21 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
     }
 
     public JsonRecordSource(final InputStream in) throws IOException {
-        jsonParser = jsonFactory.createJsonParser(in);
+        jsonParser = jsonFactory.createParser(in);
+        strategy = null;
+        startingFieldName = null;
+    }
+
+    public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
+        jsonParser = jsonFactory.createParser(in);
+        this.strategy = strategy;
+        this.startingFieldName = startingFieldName;
+
+        if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+            final SerializedString serializedNestedField = new SerializedString(this.startingFieldName);
+            while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken());
+            logger.debug("Parsing starting at nested field [{}]", startingFieldName);
+        }
     }
 
     @Override
@@ -50,6 +70,10 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
             if (token == JsonToken.START_OBJECT) {
                 return jsonParser.readValueAsTree();
             }
+
+            if (strategy == StartingFieldStrategy.NESTED_FIELD && (token == JsonToken.END_ARRAY || token == JsonToken.END_OBJECT)) {
+                return null;
+            }
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index b0ecad1c80..e5a2ed20fd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -27,10 +27,11 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
 import org.apache.nifi.schema.inference.SchemaInferenceUtil;
 import org.apache.nifi.schema.inference.TimeValueInference;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -44,6 +45,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.function.Supplier;
@@ -59,21 +61,49 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
         + "If an array is encountered, each element in that array will be treated as a separate record. "
         + "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
         + "a field that is not present in the schema, that field will be skipped. "
-    + "See the Usage of the Controller Service for more information and examples.")
+        + "See the Usage of the Controller Service for more information and examples.")
 @SeeAlso(JsonPathReader.class)
 public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
 
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile String startingFieldName;
+    private volatile StartingFieldStrategy startingFieldStrategy;
+
+    public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+            .name("starting-field-strategy")
+            .displayName("Starting Field Strategy")
+            .description("Start processing from the root node or from a specified nested node.")
+            .required(true)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(StartingFieldStrategy.ROOT_NODE.name())
+            .allowableValues(
+                    Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy ->
+                            new AllowableValue(startingStrategy.name(), startingStrategy.getDisplayName(), startingStrategy.getDescription())
+                    ).toArray(AllowableValue[]::new))
+            .build();
+
+
+    public static final PropertyDescriptor STARTING_FIELD_NAME = new PropertyDescriptor.Builder()
+            .name("starting-field-name")
+            .displayName("Starting Field Name")
+            .description("Skips forward to the given nested JSON field (array or object) to begin processing.")
+            .required(false)
+            .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+            .defaultValue(null)
+            .dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name())
+            .build();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(new PropertyDescriptor.Builder()
-            .fromPropertyDescriptor(SCHEMA_CACHE)
-            .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
-            .build());
+                .fromPropertyDescriptor(SCHEMA_CACHE)
+                .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
+                .build());
+        properties.add(STARTING_FIELD_STRATEGY);
+        properties.add(STARTING_FIELD_NAME);
         properties.add(DateTimeUtils.DATE_FORMAT);
         properties.add(DateTimeUtils.TIME_FORMAT);
         properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@@ -81,10 +111,12 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
     }
 
     @OnEnabled
-    public void storeFormats(final ConfigurationContext context) {
+    public void storePropertyValues(final ConfigurationContext context) {
         this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
         this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
         this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+        this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
+        this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
     }
 
     @Override
@@ -96,12 +128,15 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
     }
 
     @Override
-    protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
-        final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
-        final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccessStrategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+        final RecordSourceFactory<JsonNode> jsonSourceFactory =
+                (var, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
+
+        final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier =
+                () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
 
-        return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
-            () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
+        return SchemaInferenceUtil.getSchemaAccessStrategy(schemaAccessStrategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
+                () -> super.getSchemaAccessStrategy(schemaAccessStrategy, schemaRegistry, context));
     }
 
     @Override
@@ -113,6 +148,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
     public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
             throws IOException, MalformedRecordException, SchemaNotFoundException {
         final RecordSchema schema = getSchema(variables, in, null);
-        return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
+        return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName);
     }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 1bb34549c8..eee487c026 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -53,7 +53,12 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
         this.schema = schema;
     }
 
-
+    public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
+                                   final String dateFormat, final String timeFormat, final String timestampFormat,
+                                   final StartingFieldStrategy strategy, final String startingFieldName) throws IOException, MalformedRecordException {
+        super(in, logger, dateFormat, timeFormat, timestampFormat, strategy, startingFieldName);
+        this.schema = schema;
+    }
 
     @Override
     protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields)
@@ -104,7 +109,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
                     final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
                     value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
                 } else {
-                    value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType(), fieldName);
+                    value = getRawNodeValue(childNode, recordField.getDataType(), fieldName);
                 }
 
                 values.put(fieldName, value);
@@ -157,8 +162,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
             case TIME:
             case TIMESTAMP: {
                 final Object rawValue = getRawNodeValue(fieldNode, fieldName);
-                final Object converted = DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
-                return converted;
+                return DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
             }
             case MAP: {
                 final DataType valueType = ((MapDataType) desiredType).getValueType();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java
new file mode 100644
index 0000000000..44727e72fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+public enum StartingFieldStrategy {
+    ROOT_NODE("Root Node", "Begins processing from the root node."),
+    NESTED_FIELD("Nested Field", "Skips forward to the given nested JSON field (array or object) to begin processing.");
+
+    private final String displayName;
+    private final String description;
+
+    StartingFieldStrategy(final String displayName, final String description) {
+        this.displayName = displayName;
+        this.description = description;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public String getDescription() {
+        return description;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index b5415c3b74..9a563d4f65 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -22,59 +22,60 @@
 
     <body>
         <p>
-        	The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire
-        	JSON Object tree. The Controller Service must be configured with a Schema that describes the structure
-        	of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped.
-        	If the schema contains a field for which no JSON field exists, a null value will be used in the Record
-        	(or the default value defined in the schema, if applicable).
+            The JsonTreeReader Controller Service reads a JSON Object and creates a Record object either for the
+            entire JSON Object tree or a subpart (see "Starting Field Strategies" section). The Controller Service
+            must be configured with a Schema that describes the structure of the JSON data. If any field exists in
+            the JSON that is not in the schema, that field will be skipped. If the schema contains a field for which
+            no JSON field exists, a null value will be used in the Record (or the default value defined in the schema,
+            if applicable).
         </p>
 
         <p>
-        	If the root element of the JSON is a JSON Array, each JSON Object within that array will be treated as
-        	its own separate Record. If the root element is a JSON Object, the JSON will all be treated as a single
-        	Record.
+            If the root element of the JSON is a JSON Array, each JSON Object within that array will be treated as
+            its own separate Record. If the root element is a JSON Object, the JSON will all be treated as a single
+            Record.
         </p>
 
 
-		<h2>Schemas and Type Coercion</h2>
-
-		<p>
-			When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
-			configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
-			the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
-			is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
-			schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
-			into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
-		</p>
-
-		<p>
-			The following rules apply when attempting to coerce a field value from one data type to another:
-		</p>
-
-		<ul>
-			<li>Any data type can be coerced into a String type.</li>
-			<li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
-			<li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
-			milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
-			<li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
-				or "Timestamp Format."</li>
-			<li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
-				<code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
-				type but not an Integer.</li>
-			<li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
-			<li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
-				and the rest of the characters are ignored.</li>
-			<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
-			<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
-			<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
-				property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
-				representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
-		</ul>
-
-		<p>
-			If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
-			will be thrown.
-		</p>
+        <h2>Schemas and Type Coercion</h2>
+
+        <p>
+            When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
+            configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
+            the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
+            is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
+            schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
+            into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
+        </p>
+
+        <p>
+            The following rules apply when attempting to coerce a field value from one data type to another:
+        </p>
+
+        <ul>
+            <li>Any data type can be coerced into a String type.</li>
+            <li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
+            <li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
+            milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+            <li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
+                or "Timestamp Format."</li>
+            <li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
+                <code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
+                type but not an Integer.</li>
+            <li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
+            <li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
+                and the rest of the characters are ignored.</li>
+            <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
+            <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+            <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
+                property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+                representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+        </ul>
+
+        <p>
+            If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
+            will be thrown.
+        </p>
 
 
 
@@ -168,165 +169,242 @@
         </p>
 
 
+        <h2>Starting Field Strategies</h2>
 
-        <h2>Examples</h2>
+        <p>
+            When using JsonTreeReader, two different starting field strategies can be selected. With the default Root Node strategy, the JsonTreeReader begins processing from the root element
+            of the JSON and creates a Record object for the entire JSON Object tree, while the Nested Field strategy defines a nested field from which to begin processing.
+        </p>
+        <p>
+            Using the Nested Field strategy, a schema corresponding to the nested JSON part should be specified. In case of schema inference, the JsonTreeReader will automatically
+            infer a schema from nested records.
+        </p>
+
+        <h3>Root Node Strategy</h3>
 
         <p>
-        	As an example, consider the following JSON is read:
+            Consider the following JSON is read with the default Root Node strategy:
         </p>
 <code>
 <pre>
-[{
+[
+  {
     "id": 17,
     "name": "John",
     "child": {
-        "id": "1"
+      "id": "1"
     },
-    "dob": "10-29-1982"
+    "dob": "10-29-1982",
     "siblings": [
-        { "name": "Jeremy", "id": 4 },
-        { "name": "Julia", "id": 8}
+      {
+        "name": "Jeremy",
+        "id": 4
+      },
+      {
+        "name": "Julia",
+        "id": 8
+      }
     ]
   },
   {
     "id": 98,
     "name": "Jane",
     "child": {
-        "id": 2
+      "id": 2
     },
-    "dob": "08-30-1984"
+    "dob": "08-30-1984",
     "gender": "F",
     "siblingIds": [],
     "siblings": []
-  }]
+  }
+]
 </pre>
 </code>
 
         <p>
-        	Also, consider that the schema that is configured for this JSON is as follows (assuming that the AvroSchemaRegistry
-        	Controller Service is chosen to denote the Schema):
+            Also, consider that the schema that is configured for this JSON is as follows (assuming that the AvroSchemaRegistry
+            Controller Service is chosen to denote the Schema):
         </p>
 
 <code>
 <pre>
 {
-	"namespace": "nifi",
-	"name": "person",
-	"type": "record",
-	"fields": [
-		{ "name": "id", "type": "int" },
-		{ "name": "name", "type": "string" },
-		{ "name": "gender", "type": "string" },
-		{ "name": "dob", "type": {
-			"type": "int",
-			"logicalType": "date"
-		}},
-		{ "name": "siblings", "type": {
-			"type": "array",
-			"items": {
-				"type": "record",
-				"fields": [
-					{ "name": "name", "type": "string" }
-				]
-			}
-		}}
-	]
+    "namespace": "nifi",
+    "name": "person",
+    "type": "record",
+    "fields": [
+        { "name": "id", "type": "int" },
+        { "name": "name", "type": "string" },
+        { "name": "gender", "type": "string" },
+        { "name": "dob", "type": {
+            "type": "int",
+            "logicalType": "date"
+        }},
+        { "name": "siblings", "type": {
+            "type": "array",
+            "items": {
+                "type": "record",
+                "fields": [
+                    { "name": "name", "type": "string" }
+                ]
+            }
+        }}
+    ]
 }
 </pre>
 </code>
 
         <p>
-        	Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this
-        	matches the date format used for our JSON data. This will result in the JSON creating two separate records, because the root
-        	element is a JSON array with two elements.
+            Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this
+            matches the date format used for our JSON data. This will result in the JSON creating two separate records, because the root
+            element is a JSON array with two elements.
         </p>
 
         <p>
-        	The first Record will consist of the following values:
+            The first Record will consist of the following values:
         </p>
 
         <table>
-        	<tr>
-    			<th>Field Name</th>
-    			<th>Field Value</th>
-        	</tr>
-    		<tr>
-    			<td>id</td>
-    			<td>17</td>
-    		</tr>
-    		<tr>
-    			<td>name</td>
-    			<td>John</td>
-    		</tr>
-    		<tr>
-    			<td>gender</td>
-    			<td><i>null</i></td>
-    		</tr>
-    		<tr>
-    			<td>dob</td>
-    			<td>11-30-1983</td>
-    		</tr>
-    		<tr>
-    			<td>siblings</td>
-    			<td>
-    				<i>array with two elements, each of which is itself a Record:</i>
-    				<br />
-    				<table>
-    					<tr>
-							<th>Field Name</th>
-							<th>Field Value</th>
-						</tr>
-						<tr>
-							<td>name</td>
-							<td>Jeremy</td>
-						</tr>
-    				</table>
-    				<br />
-    				<i>and:</i>
-    				<br />
-    				<table>
-						<tr>
-							<th>Field Name</th>
-							<th>Field Value</th>
-						</tr>
-						<tr>
-							<td>name</td>
-							<td>Julia</td>
-						</tr>
-    				</table>
-    			</td>
-    		</tr>
+            <tr>
+                <th>Field Name</th>
+                <th>Field Value</th>
+            </tr>
+            <tr>
+                <td>id</td>
+                <td>17</td>
+            </tr>
+            <tr>
+                <td>name</td>
+                <td>John</td>
+            </tr>
+            <tr>
+                <td>gender</td>
+                <td><i>null</i></td>
+            </tr>
+            <tr>
+                <td>dob</td>
+                <td>11-30-1983</td>
+            </tr>
+            <tr>
+                <td>siblings</td>
+                <td>
+                    <i>array with two elements, each of which is itself a Record:</i>
+                    <br />
+                    <table>
+                        <tr>
+                            <th>Field Name</th>
+                            <th>Field Value</th>
+                        </tr>
+                        <tr>
+                            <td>name</td>
+                            <td>Jeremy</td>
+                        </tr>
+                    </table>
+                    <br />
+                    <i>and:</i>
+                    <br />
+                    <table>
+                        <tr>
+                            <th>Field Name</th>
+                            <th>Field Value</th>
+                        </tr>
+                        <tr>
+                            <td>name</td>
+                            <td>Julia</td>
+                        </tr>
+                    </table>
+                </td>
+            </tr>
         </table>
 
         <p>
-        	The second Record will consist of the following values:
+            The second Record will consist of the following values:
         </p>
 
-		<table>
-			<tr>
-    			<th>Field Name</th>
-    			<th>Field Value</th>
-        	</tr>
-    		<tr>
-    			<td>id</td>
-    			<td>98</td>
-    		</tr>
-    		<tr>
-    			<td>name</td>
-    			<td>Jane</td>
-    		</tr>
-    		<tr>
-    			<td>gender</td>
-    			<td>F</td>
-    		</tr>
-    		<tr>
-    			<td>dob</td>
-    			<td>08-30-1984</td>
-    		</tr>
-    		<tr>
-    			<td>siblings</td>
-    			<td><i>empty array</i></td>
-    		</tr>
+        <table>
+            <tr>
+                <th>Field Name</th>
+                <th>Field Value</th>
+            </tr>
+            <tr>
+                <td>id</td>
+                <td>98</td>
+            </tr>
+            <tr>
+                <td>name</td>
+                <td>Jane</td>
+            </tr>
+            <tr>
+                <td>gender</td>
+                <td>F</td>
+            </tr>
+            <tr>
+                <td>dob</td>
+                <td>08-30-1984</td>
+            </tr>
+            <tr>
+                <td>siblings</td>
+                <td><i>empty array</i></td>
+            </tr>
+        </table>
+
+        <h3>Nested Field Strategy</h3>
+
+        <p>
+            Using the Nested Field strategy, consider the same JSON where the specified Starting Field Name is
+            "siblings". The schema that is configured for this JSON is as follows:
+        </p>
+
+<code>
+<pre>
+{
+    "namespace": "nifi",
+    "name": "siblings",
+    "type": "record",
+    "fields": [
+        { "name": "name", "type": "string" },
+        { "name": "id", "type": "int" }
+    ]
+}
+</pre>
+</code>
+
+        <p>
+            The first Record will consist of the following values:
+        </p>
+
+        <table>
+            <tr>
+                <th>Field Name</th>
+                <th>Field Value</th>
+            </tr>
+            <tr>
+                <td>name</td>
+                <td>Jeremy</td>
+            </tr>
+            <tr>
+                <td>id</td>
+                <td>4</td>
+            </tr>
+        </table>
+
+        <p>
+            The second Record will consist of the following values:
+        </p>
+
+        <table>
+            <tr>
+                <th>Field Name</th>
+                <th>Field Value</th>
+            </tr>
+            <tr>
+                <td>name</td>
+                <td>Julia</td>
+            </tr>
+            <tr>
+                <td>id</td>
+                <td>8</td>
+            </tr>
         </table>
 
     </body>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
index 960c7b6127..7ef2089fe2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
@@ -29,6 +29,9 @@ import org.apache.nifi.serialization.record.type.ChoiceDataType;
 import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
 import org.mockito.Mockito;
 
 import java.io.BufferedInputStream;
@@ -41,32 +44,31 @@ import java.nio.file.Files;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestInferJsonSchemaAccessStrategy {
-    private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
-    private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
-    private final String timestampFormat = "yyyy-MM-DD'T'HH:mm:ss.SSS'Z'";
+class TestInferJsonSchemaAccessStrategy {
 
     private final SchemaInferenceEngine<JsonNode> timestampInference = new JsonSchemaInference(new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
     private final SchemaInferenceEngine<JsonNode> noTimestampInference = new JsonSchemaInference(new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
 
     @Test
-    @Disabled
-    public void testPerformanceOfSchemaInferenceWithTimestamp() throws IOException {
+    @Disabled("Intended only for manual testing to determine performance before/after modifications")
+    void testPerformanceOfSchemaInferenceWithTimestamp() throws IOException {
         final File file = new File("src/test/resources/json/prov-events.json");
         final byte[] data = Files.readAllBytes(file.toPath());
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
 
         final byte[] manyCopies = new byte[data.length * 20];
-        for (int i=0; i < 20; i++) {
+        for (int i = 0; i < 20; i++) {
             System.arraycopy(data, 0, manyCopies, data.length * i, data.length);
         }
 
-        final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content), timestampInference, Mockito.mock(ComponentLog.class));
+        final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
+                (var, content) -> new JsonRecordSource(content), timestampInference, Mockito.mock(ComponentLog.class)
+        );
 
         for (int j = 0; j < 10; j++) {
             final long start = System.nanoTime();
@@ -83,14 +85,13 @@ public class TestInferJsonSchemaAccessStrategy {
     }
 
     @Test
-    @Disabled
-    public void testPerformanceOfSchemaInferenceWithoutTimestamp() throws IOException {
+    @Disabled("Intended only for manual testing to determine performance before/after modifications")
+    void testPerformanceOfSchemaInferenceWithoutTimestamp() throws IOException {
         final File file = new File("src/test/resources/json/prov-events.json");
         final byte[] data = Files.readAllBytes(file.toPath());
-        final ComponentLog logger = Mockito.mock(ComponentLog.class);
 
         final byte[] manyCopies = new byte[data.length * 20];
-        for (int i=0; i < 20; i++) {
+        for (int i = 0; i < 20; i++) {
             System.arraycopy(data, 0, manyCopies, data.length * i, data.length);
         }
 
@@ -99,8 +100,8 @@ public class TestInferJsonSchemaAccessStrategy {
 
             for (int i = 0; i < 10_000; i++) {
                 try (final InputStream in = new ByteArrayInputStream(manyCopies)) {
-                    final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content),
-                         noTimestampInference, Mockito.mock(ComponentLog.class));
+                    final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var, content) -> new JsonRecordSource(content),
+                            noTimestampInference, Mockito.mock(ComponentLog.class));
 
                     final RecordSchema schema = accessStrategy.getSchema(null, in, null);
                 }
@@ -112,9 +113,9 @@ public class TestInferJsonSchemaAccessStrategy {
     }
 
     @Test
-    public void testInferenceIncludesAllRecords() throws IOException {
+    void testInferenceIncludesAllRecords() throws IOException {
         final File file = new File("src/test/resources/json/prov-events.json");
-        final RecordSchema schema = inferSchema(file);
+        final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
 
         final RecordField extraField1 = schema.getField("extra field 1").get();
         assertSame(RecordFieldType.STRING, extraField1.getDataType().getFieldType());
@@ -127,7 +128,7 @@ public class TestInferJsonSchemaAccessStrategy {
         assertSame(RecordFieldType.RECORD, updatedAttributesDataType.getFieldType());
 
         final List<String> expectedAttributeNames = Arrays.asList("path", "filename", "drop reason", "uuid", "reporting.task.type", "s2s.address", "schema.cache.identifier", "reporting.task.uuid",
-            "record.count", "s2s.host", "reporting.task.transaction.id", "reporting.task.name", "mime.type");
+                "record.count", "s2s.host", "reporting.task.transaction.id", "reporting.task.name", "mime.type");
 
         final RecordSchema updatedAttributesSchema = ((RecordDataType) updatedAttributesDataType).getChildSchema();
         assertEquals(expectedAttributeNames.size(), updatedAttributesSchema.getFieldCount());
@@ -138,9 +139,9 @@ public class TestInferJsonSchemaAccessStrategy {
     }
 
     @Test
-    public void testDateAndTimestampsInferred() throws IOException {
+    void testDateAndTimestampsInferred() throws IOException {
         final File file = new File("src/test/resources/json/prov-events.json");
-        final RecordSchema schema = inferSchema(file);
+        final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
 
         final RecordField timestampField = schema.getField("timestamp").get();
         assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), timestampField.getDataType());
@@ -167,9 +168,9 @@ public class TestInferJsonSchemaAccessStrategy {
      * Test is intended to ensure that all inference rules that are explained in the readers' additionalDetails.html are correct
      */
     @Test
-    public void testDocsExample() throws IOException {
+    void testDocsExample() throws IOException {
         final File file = new File("src/test/resources/json/docs-example.json");
-        final RecordSchema schema = inferSchema(file);
+        final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
 
         assertSame(RecordFieldType.STRING, schema.getDataType("name").get().getFieldType());
         assertSame(RecordFieldType.CHOICE, schema.getDataType("age").get().getFieldType());
@@ -189,14 +190,70 @@ public class TestInferJsonSchemaAccessStrategy {
         assertSame(RecordFieldType.STRING, schema.getDataType("nullValue").get().getFieldType());
     }
 
-    private RecordSchema inferSchema(final File file) throws IOException {
+    @ParameterizedTest(name = "{index} {2}")
+    @MethodSource("startingFieldNameArgumentProvider")
+    void testInferenceStartsFromArray(final String jsonPath, final StartingFieldStrategy strategy, final String startingFieldName, String testName) throws IOException {
+        final File file = new File(jsonPath);
+        final RecordSchema schema = inferSchema(file, strategy, startingFieldName);
+
+        assertEquals(2, schema.getFieldCount());
+
+        final RecordField field1 = schema.getField("id").get();
+        assertSame(RecordFieldType.INT, field1.getDataType().getFieldType());
+
+        final RecordField field2 = schema.getField("balance").get();
+        assertSame(RecordFieldType.DOUBLE, field2.getDataType().getFieldType());
+    }
+
+    @Test
+    void testInferenceStartsFromSimpleFieldAndNoNestedObjectOrArrayFound() throws IOException {
+        final File file = new File("src/test/resources/json/single-element-nested-array-middle.json");
+        final RecordSchema schema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "name");
+
+        assertEquals(0, schema.getFieldCount());
+    }
+
+    @Test
+    void testInferenceStartFromNonExistentField() throws IOException {
+        final File file = new File("src/test/resources/json/single-element-nested-array.json");
+        final RecordSchema recordSchema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "notfound");
+        assertEquals(0, recordSchema.getFieldCount());
+    }
+
+    @Test
+    void testInferenceStartFromMultipleNestedField() throws IOException {
+        final File file = new File("src/test/resources/json/multiple-nested-field.json");
+        final RecordSchema schema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "accountIds");
+
+        final RecordField field1 = schema.getField("id").get();
+        assertSame(RecordFieldType.STRING, field1.getDataType().getFieldType());
+
+        final RecordField field2 = schema.getField("type").get();
+        assertSame(RecordFieldType.STRING, field2.getDataType().getFieldType());
+    }
+
+    private RecordSchema inferSchema(final File file, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
         try (final InputStream in = new FileInputStream(file);
              final InputStream bufferedIn = new BufferedInputStream(in)) {
 
-            final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content),
-                timestampInference, Mockito.mock(ComponentLog.class));
+            final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
+                    (var, content) -> new JsonRecordSource(content, strategy, startingFieldName),
+                    timestampInference, Mockito.mock(ComponentLog.class)
+            );
 
             return accessStrategy.getSchema(null, bufferedIn, null);
         }
     }
+
+    private static Stream<Arguments> startingFieldNameArgumentProvider() {
+        final StartingFieldStrategy strategy = StartingFieldStrategy.NESTED_FIELD;
+        return Stream.of(
+                Arguments.of("src/test/resources/json/single-element-nested-array.json", strategy, "accounts", "testInferenceSkipsToNestedArray"),
+                Arguments.of("src/test/resources/json/single-element-nested.json", strategy, "account", "testInferenceSkipsToNestedObject"),
+                Arguments.of("src/test/resources/json/single-element-nested-array.json", strategy, "name", "testInferenceSkipsToSimpleFieldFindsNextNestedArray"),
+                Arguments.of("src/test/resources/json/single-element-nested.json", strategy, "name", "testInferenceSkipsToSimpleFieldFindsNextNestedObject"),
+                Arguments.of("src/test/resources/json/single-element-nested-array-middle.json", strategy, "accounts", "testInferenceSkipsToNestedArrayInMiddle"),
+                Arguments.of("src/test/resources/json/nested-array-then-start-object.json", strategy, "accounts", "testInferenceSkipsToNestedThenStartObject")
+        );
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 904e7cd8d0..ed29e5cc5b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestJsonPathRowRecordReader {
+class TestJsonPathRowRecordReader {
     private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
     private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
     private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -87,25 +87,24 @@ public class TestJsonPathRowRecordReader {
         accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
         accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
 
-        final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
-        return accountSchema;
+        return new SimpleRecordSchema(accountFields);
     }
 
 
     @Test
-    public void testReadArray() throws IOException, MalformedRecordException {
+    void testReadArray() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -119,19 +118,19 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testReadOneLine() throws IOException, MalformedRecordException {
+    void testReadOneLine() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
              final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -145,19 +144,19 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testSingleJsonElement() throws IOException, MalformedRecordException {
+    void testSingleJsonElement() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -168,7 +167,7 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+    void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
         final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(recordFields);
 
@@ -195,7 +194,7 @@ public class TestJsonPathRowRecordReader {
 
 
     @Test
-    public void testElementWithNestedData() throws IOException, MalformedRecordException {
+    void testElementWithNestedData() throws IOException, MalformedRecordException {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("account", JsonPath.compile("$.account"));
 
@@ -208,12 +207,12 @@ public class TestJsonPathRowRecordReader {
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "account");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -231,7 +230,7 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testElementWithNestedArray() throws IOException, MalformedRecordException {
+    void testElementWithNestedArray() throws IOException, MalformedRecordException {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("accounts", JsonPath.compile("$.accounts"));
 
@@ -246,13 +245,12 @@ public class TestJsonPathRowRecordReader {
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {
-                "id", "name", "balance", "address", "city", "state", "zipCode", "country", "accounts"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "accounts");
             assertEquals(expectedFieldNames, fieldNames);
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -282,19 +280,19 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
+    void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
         try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -311,7 +309,7 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
+    void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("address2", JsonPath.compile("$.address2"));
 
@@ -324,12 +322,12 @@ public class TestJsonPathRowRecordReader {
             final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2");
             assertEquals(expectedFieldNames, fieldNames);
 
             final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -346,7 +344,7 @@ public class TestJsonPathRowRecordReader {
     }
 
     @Test
-    public void testPrimitiveTypeArrays() throws IOException, MalformedRecordException {
+    void testPrimitiveTypeArrays() throws IOException, MalformedRecordException {
         final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
         jsonPaths.put("accountIds", JsonPath.compile("$.accountIds"));
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 683e884237..1eb655c117 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -35,12 +35,12 @@ import java.util.List;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 
-public class TestJsonSchemaInference {
+class TestJsonSchemaInference {
 
     private final TimeValueInference timestampInference = new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
 
     @Test
-    public void testInferenceIncludesAllRecords() throws IOException {
+    void testInferenceIncludesAllRecords() throws IOException {
         final File file = new File("src/test/resources/json/data-types.json");
 
         final RecordSchema schema;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index f65c0752fe..752d1a6676 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -50,7 +50,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -59,11 +58,12 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
-public class TestJsonTreeRowRecordReader {
+class TestJsonTreeRowRecordReader {
     private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
     private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
     private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -90,13 +90,12 @@ public class TestJsonTreeRowRecordReader {
         accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
         accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
 
-        final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
-        return accountSchema;
+        return new SimpleRecordSchema(accountFields);
     }
 
 
     @Test
-    public void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
+    void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
         final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc");
         final File jsonFile = new File("src/test/resources/json/choice-of-string-or-array-record.json");
 
@@ -129,7 +128,7 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     @Disabled("Intended only for manual testing to determine performance before/after modifications")
-    public void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
+    void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
 
         final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
@@ -158,7 +157,7 @@ public class TestJsonTreeRowRecordReader {
 
     @Test
     @Disabled("Intended only for manual testing to determine performance before/after modifications")
-    public void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
+    void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
 
         final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
@@ -186,12 +185,12 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
+    void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
         final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc"));
         final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/elements-for-record-choice.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
 
             // evaluate first record
             final Record firstRecord = reader.nextRecord();
@@ -201,7 +200,7 @@ public class TestJsonTreeRowRecordReader {
             assertEquals("1234", firstRecord.getValue("id"));
 
             // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
-            assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
+            assertSame(RecordFieldType.CHOICE, firstOuterSchema.getDataType("child").get().getFieldType());
             final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
             assertEquals(2, firstSubTypes.size());
             assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
@@ -212,7 +211,7 @@ public class TestJsonTreeRowRecordReader {
             final Record firstChildRecord = (Record) childObject;
             final RecordSchema firstChildSchema = firstChildRecord.getSchema();
 
-            assertEquals(Arrays.asList("id"), firstChildSchema.getFieldNames());
+            assertEquals(Collections.singletonList("id"), firstChildSchema.getFieldNames());
 
             // evaluate second record
             final Record secondRecord = reader.nextRecord();
@@ -223,7 +222,7 @@ public class TestJsonTreeRowRecordReader {
             assertEquals("1234", secondRecord.getValue("id"));
 
             // record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
-            assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
+            assertSame(RecordFieldType.CHOICE, secondOuterSchema.getDataType("child").get().getFieldType());
             final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
             assertEquals(2, secondSubTypes.size());
             assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
@@ -234,7 +233,7 @@ public class TestJsonTreeRowRecordReader {
             final Record secondChildRecord = (Record) secondChildObject;
             final RecordSchema secondChildSchema = secondChildRecord.getSchema();
 
-            assertEquals(Arrays.asList("name"), secondChildSchema.getFieldNames());
+            assertEquals(Collections.singletonList("name"), secondChildSchema.getFieldNames());
 
             assertNull(reader.nextRecord());
         }
@@ -242,19 +241,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadArray() throws IOException, MalformedRecordException {
+    void testReadArray() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -268,19 +267,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
+    void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json");
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -294,20 +293,20 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadMultilineJSON() throws IOException, MalformedRecordException {
+    void testReadMultilineJSON() throws IOException, MalformedRecordException {
         final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json");
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -321,19 +320,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadMultilineArrays() throws IOException, MalformedRecordException {
+    void testReadMultilineArrays() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiarray.json"));
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json");
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -353,19 +352,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadMixedJSON() throws IOException, MalformedRecordException {
+    void testReadMixedJSON() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-mixed.json"));
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-mixed.json");
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -386,14 +385,14 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
+    void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Record schemaValidatedRecord = reader.nextRecord(true, true);
             assertEquals(1, schemaValidatedRecord.getValue("id"));
@@ -401,8 +400,8 @@ public class TestJsonTreeRowRecordReader {
             assertNull(schemaValidatedRecord.getValue("balance"));
         }
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Record rawRecord = reader.nextRecord(false, false);
             assertEquals(1, rawRecord.getValue("id"));
@@ -418,14 +417,14 @@ public class TestJsonTreeRowRecordReader {
 
 
     @Test
-    public void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
+    void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Record schemaValidatedRecord = reader.nextRecord(true, true);
             assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
@@ -435,8 +434,8 @@ public class TestJsonTreeRowRecordReader {
             assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
         }
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Record rawRecord = reader.nextRecord(false, false);
             assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
@@ -453,7 +452,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testDateCoercedFromString() throws IOException, MalformedRecordException {
+    void testDateCoercedFromString() throws IOException, MalformedRecordException {
         final String dateField = "date";
         final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(recordFields);
@@ -474,12 +473,12 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+    void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
         final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(recordFields);
 
         for (final boolean coerceTypes : new boolean[] {true, false}) {
-            try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json"));
+            try (final InputStream in = new FileInputStream("src/test/resources/json/timestamp.json");
                  final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
 
                 final Record record = reader.nextRecord(coerceTypes, false);
@@ -490,19 +489,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testSingleJsonElement() throws IOException, MalformedRecordException {
+    void testSingleJsonElement() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -513,21 +512,21 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
+    void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
         // Wraps default fields by Choice data type to test mapping to a Choice type.
         final List<RecordField> choiceFields = getDefaultFields().stream()
                 .map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
         final RecordSchema schema = new SimpleRecordSchema(choiceFields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+        try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
              final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             final List<RecordField> fields = schema.getFields();
             for (int i = 0; i < schema.getFields().size(); i++) {
                 assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType);
@@ -543,19 +542,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testElementWithNestedData() throws IOException, MalformedRecordException {
+    void testElementWithNestedData() throws IOException, MalformedRecordException {
         final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
         final List<RecordField> fields = getDefaultFields();
         fields.add(new RecordField("account", accountType));
         fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -573,7 +572,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testElementWithNestedArray() throws IOException, MalformedRecordException {
+    void testElementWithNestedArray() throws IOException, MalformedRecordException {
         final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
         final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
 
@@ -582,17 +581,16 @@ public class TestJsonTreeRowRecordReader {
         fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested-array.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {
-                "id", "name", "address", "city", "state", "zipCode", "country", "accounts"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "address", "city", "state", "zipCode", "country", "accounts");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -607,19 +605,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
+    void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
-                RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+                    RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -636,53 +634,19 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
-        final Map<String, DataType> overrides = new HashMap<>();
-        overrides.put("address2", RecordFieldType.STRING.getDataType());
-
-        final List<RecordField> fields = getDefaultFields();
-        fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
-        final RecordSchema schema = new SimpleRecordSchema(fields);
-
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
-
-            final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
-            assertEquals(expectedFieldNames, fieldNames);
-
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
-            assertEquals(expectedTypes, dataTypes);
-
-            final Object[] firstRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA", null}, firstRecordValues);
-
-            final Object[] secondRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", null, null}, secondRecordValues);
-
-            final Object[] thirdRecordValues = reader.nextRecord().getValues();
-            assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My Street", "My City", "MS", "11111", "USA", "Apt. #12"}, thirdRecordValues);
-
-            assertNull(reader.nextRecord());
-        }
-    }
-
-    @Test
-    public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
+    void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final List<String> fieldNames = schema.getFieldNames();
-            final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+            final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
             assertEquals(expectedFieldNames, fieldNames);
 
-            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
-            final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
-                RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+            final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+            final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
+                    RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
             assertEquals(expectedTypes, dataTypes);
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -700,7 +664,7 @@ public class TestJsonTreeRowRecordReader {
 
 
     @Test
-    public void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
+    void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
 
         final List<RecordField> fromFields = new ArrayList<>();
         fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
@@ -715,8 +679,8 @@ public class TestJsonTreeRowRecordReader {
         fields.add(new RecordField("from", fromType));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json"));
-            final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+        try (final InputStream in = new FileInputStream("src/test/resources/json/json-with-unicode.json");
+             final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
 
             final Object[] firstRecordValues = reader.nextRecord().getValues();
 
@@ -732,7 +696,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testIncorrectSchema() {
+    void testIncorrectSchema() {
         final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
         final List<RecordField> fields = getDefaultFields();
         fields.add(new RecordField("account", accountType));
@@ -756,7 +720,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testMergeOfSimilarRecords() throws Exception {
+    void testMergeOfSimilarRecords() throws Exception {
         // GIVEN
         String jsonPath = "src/test/resources/json/similar-records.json";
 
@@ -789,7 +753,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testChoiceOfEmbeddedSimilarRecords() throws Exception {
+    void testChoiceOfEmbeddedSimilarRecords() throws Exception {
         // GIVEN
         String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json";
 
@@ -801,11 +765,11 @@ public class TestJsonTreeRowRecordReader {
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
-            ))
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
+                ))
         ));
 
         List<Object> expected = Arrays.asList(
@@ -829,12 +793,12 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
+    void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
         // GIVEN
         String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
 
-        SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("integer", RecordFieldType.INT.getDataType())
+        SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("integer", RecordFieldType.INT.getDataType())
         ));
         SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -848,13 +812,13 @@ public class TestJsonTreeRowRecordReader {
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-            ))
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+                ))
         ));
 
         List<Object> expected = Arrays.asList(
@@ -901,7 +865,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
+    void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
         // GIVEN
         String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
 
@@ -922,13 +886,13 @@ public class TestJsonTreeRowRecordReader {
             new RecordField("string", RecordFieldType.STRING.getDataType()),
             new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
-                RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
-            ))
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+                        RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+                ))
         ));
 
         List<Object> expected = Arrays.asList(
@@ -980,7 +944,7 @@ public class TestJsonTreeRowRecordReader {
     }
 
     @Test
-    public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
+    void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
         // GIVEN
         String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
 
@@ -993,18 +957,18 @@ public class TestJsonTreeRowRecordReader {
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
 
-        RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
-            ))
+        RecordSchema recordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
+                ))
         ));
 
-        RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
-                RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
-            )
-        )));
+        RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
+                        RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
+                )
+                )));
 
         SimpleRecordSchema expectedChildSchema1 = new SimpleRecordSchema(Arrays.asList(
             new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -1014,11 +978,11 @@ public class TestJsonTreeRowRecordReader {
             new RecordField("integer", RecordFieldType.INT.getDataType()),
             new RecordField("string", RecordFieldType.STRING.getDataType())
         ));
-        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
-            new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
-                RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
-            ))
+        RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+                new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
+                        RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
+                ))
         ));
 
         // Since the actual arrays have records with either (INT, BOOLEAN, STRING) or (INT, STRING, STRING)
@@ -1062,14 +1026,119 @@ public class TestJsonTreeRowRecordReader {
         testReadRecords(jsonPath, schema, expected);
     }
 
+    @Test
+    void testStartFromNestedArray() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested-array.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }}),
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 43);
+                    put("balance", 48212.38);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accounts");
+    }
+
+    @Test
+    void testStartsFromNestedObject() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Collections.singletonList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }})
+        );
+
+        testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "account");
+    }
+
+    @Test
+    void testStartsFromMultipleNestedField() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/multiple-nested-field.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.STRING.getDataType()),
+                new RecordField("type", RecordFieldType.STRING.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                    new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                        put("id", "n312kj3");
+                        put("type", "employee");
+                    }}),
+                    new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                        put("id", "dl2kdff");
+                        put("type", "security");
+                    }})
+        );
+
+        testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accountIds");
+    }
+
+    @Test
+    void testStartFromSimpleFieldReturnsEmptyJson() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+        testReadRecords(jsonPath, Collections.emptyList(), StartingFieldStrategy.NESTED_FIELD, "name");
+    }
+
+    @Test
+    void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields());
+        List<Object> expected = Collections.emptyList();
+
+        testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "notfound");
+    }
+
+    @Test
+    void testStartFromNestedFieldThenStartObject() throws IOException, MalformedRecordException {
+        String jsonPath = "src/test/resources/json/nested-array-then-start-object.json";
+
+        SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+                new RecordField("id", RecordFieldType.INT.getDataType()),
+                new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+        ));
+
+        List<Object> expected = Arrays.asList(
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 42);
+                    put("balance", 4750.89);
+                }}),
+                new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+                    put("id", 43);
+                    put("balance", 48212.38);
+                }})
+        );
+
+        testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "accounts");
+    }
+
     private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
         // GIVEN
         final File jsonFile = new File(jsonPath);
 
         try (
-            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
+            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
         ) {
-            RecordSchema schema = inferSchema(jsonStream);
+            RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null);
 
             // WHEN
             // THEN
@@ -1077,25 +1146,30 @@ public class TestJsonTreeRowRecordReader {
         }
     }
 
-    private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
-        // GIVEN
+    private void testReadRecords(String jsonPath, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
         final File jsonFile = new File(jsonPath);
+        try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
+            RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName);
+            testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
+        }
+    }
 
-        try (
-            InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
-        ) {
-            // WHEN
-            // THEN
+    private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
+        final File jsonFile = new File(jsonPath);
+        try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
             testReadRecords(jsonStream, schema, expected);
         }
     }
 
+    private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
+        final File jsonFile = new File(jsonPath);
+        try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
+            testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
+        }
+    }
+
     private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
-        // GIVEN
-        try (
-            JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat);
-        ) {
-            // WHEN
+        try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
             List<Object> actual = new ArrayList<>();
             Record record;
             while ((record = reader.nextRecord()) != null) {
@@ -1103,7 +1177,6 @@ public class TestJsonTreeRowRecordReader {
                 actual.addAll(dataCollection);
             }
 
-            // THEN
             List<Function<Object, Object>> propertyProviders = Arrays.asList(
                 _object -> ((Record)_object).getSchema(),
                 _object -> Arrays.stream(((Record)_object).getValues()).map(value -> {
@@ -1122,9 +1195,38 @@ public class TestJsonTreeRowRecordReader {
         }
     }
 
-    private RecordSchema inferSchema(InputStream jsonStream) throws IOException {
+    private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName)
+            throws IOException, MalformedRecordException {
+        try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
+                strategy, startingFieldName)) {
+            List<Object> actual = new ArrayList<>();
+            Record record;
+
+            while ((record = reader.nextRecord()) != null) {
+                actual.add(record);
+            }
+
+            List<Function<Object, Object>> propertyProviders = Arrays.asList(
+                    _object -> ((Record)_object).getSchema(),
+                    _object -> Arrays.stream(((Record)_object).getValues()).map(value -> {
+                        if (value != null && value.getClass().isArray()) {
+                            return Arrays.asList((Object[]) value);
+                        } else {
+                            return value;
+                        }
+                    }).collect(Collectors.toList())
+            );
+
+            List<EqualsWrapper<Object>> wrappedExpected = EqualsWrapper.wrapList(expected, propertyProviders);
+            List<EqualsWrapper<Object>> wrappedActual = EqualsWrapper.wrapList(actual, propertyProviders);
+
+            assertEquals(wrappedExpected, wrappedActual);
+        }
+    }
+
+    private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
         RecordSchema schema = new InferSchemaAccessStrategy<>(
-            (__, inputStream) -> new JsonRecordSource(inputStream),
+            (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName),
             new JsonSchemaInference(new TimeValueInference(null, null, null)),
             mock(ComponentLog.class)
         ).getSchema(Collections.emptyMap(), jsonStream, null);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 465164b18a..84e06523cd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -54,10 +54,10 @@ import java.util.Map;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
-public class TestWriteJsonResult {
+class TestWriteJsonResult {
 
     @Test
-    public void testDataTypes() throws IOException, ParseException {
+    void testDataTypes() throws IOException, ParseException {
         final List<RecordField> fields = new ArrayList<>();
         for (final RecordFieldType fieldType : RecordFieldType.values()) {
             if (fieldType == RecordFieldType.CHOICE) {
@@ -121,7 +121,7 @@ public class TestWriteJsonResult {
 
 
     @Test
-    public void testWriteSerializedForm() throws IOException {
+    void testWriteSerializedForm() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
@@ -160,7 +160,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testTimestampWithNullFormat() throws IOException {
+    void testTimestampWithNullFormat() throws IOException {
         final Map<String, Object> values = new HashMap<>();
         values.put("timestamp", new java.sql.Timestamp(37293723L));
         values.put("time", new java.sql.Time(37293723L));
@@ -192,7 +192,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testExtraFieldInWriteRecord() throws IOException {
+    void testExtraFieldInWriteRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -219,7 +219,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testExtraFieldInWriteRawRecord() throws IOException {
+    void testExtraFieldInWriteRawRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -246,7 +246,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testMissingFieldInWriteRecord() throws IOException {
+    void testMissingFieldInWriteRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -273,7 +273,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testMissingFieldInWriteRawRecord() throws IOException {
+    void testMissingFieldInWriteRawRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -300,7 +300,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testMissingAndExtraFieldInWriteRecord() throws IOException {
+    void testMissingAndExtraFieldInWriteRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -328,7 +328,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
+    void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -356,7 +356,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testNullSuppression() throws IOException {
+    void testNullSuppression() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
         fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -378,8 +378,8 @@ public class TestWriteJsonResult {
 
         baos.reset();
         try (
-            final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
-                    NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
+                final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
+                        NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithMissingName);
             writer.finishRecordSet();
@@ -390,7 +390,7 @@ public class TestWriteJsonResult {
         baos.reset();
         try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
                 NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
-            null)) {
+                null)) {
             writer.beginRecordSet();
             writer.write(recordWithMissingName);
             writer.finishRecordSet();
@@ -414,8 +414,8 @@ public class TestWriteJsonResult {
 
         baos.reset();
         try (
-            final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
-                    NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
+                final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
+                        NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
             writer.beginRecordSet();
             writer.write(recordWithNullValue);
             writer.finishRecordSet();
@@ -426,7 +426,7 @@ public class TestWriteJsonResult {
         baos.reset();
         try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
                 NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
-            null)) {
+                null)) {
             writer.beginRecordSet();
             writer.write(recordWithNullValue);
             writer.finishRecordSet();
@@ -437,7 +437,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testOnelineOutput() throws IOException {
+    void testOnelineOutput() throws IOException {
         final Map<String, Object> values1 = new HashMap<>();
         values1.put("timestamp", new java.sql.Timestamp(37293723L));
         values1.put("time", new java.sql.Time(37293723L));
@@ -480,7 +480,7 @@ public class TestWriteJsonResult {
     }
 
     @Test
-    public void testChoiceArray() throws IOException {
+    void testChoiceArray() throws IOException {
         final List<RecordField> fields = new ArrayList<>();
         fields.add(new RecordField("path", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))));
         final RecordSchema schema = new SimpleRecordSchema(fields);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json
new file mode 100644
index 0000000000..7b2f47d28a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json
@@ -0,0 +1,22 @@
+[
+	{
+		"id": 1,
+		"name": "John Doe",
+		"balance": 4750.89,
+		"address": "123 My Street",
+		"city": "My City", 
+		"state": "MS",
+		"zipCode": "11111",
+		"country": "USA",
+		"accountIds": [
+			{
+				"id": "n312kj3",
+				"type": "employee"
+			},
+			{
+				"id": "dl2kdff",
+				"type": "security"
+			}
+		]
+	}
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json
new file mode 100644
index 0000000000..662ebb8dc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json
@@ -0,0 +1,20 @@
+[
+  {
+    "id": 17,
+    "name": "John",
+    "accounts": [
+      {
+        "id": 42,
+        "balance": 4750.89
+      },
+      {
+        "id": 43,
+        "balance": 48212.38
+      }
+    ]
+  },
+  {
+    "id": 98,
+    "balance": 67829.12
+  }
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json
new file mode 100644
index 0000000000..ccc2121ef1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json
@@ -0,0 +1,16 @@
+{
+	"id": 1,
+	"accounts": [{
+		"id": 42,
+		"balance": 4750.89
+	}, {
+		"id": 43,
+		"balance": 48212.38
+	}],
+	"name": "John Doe",
+	"address": "123 My Street",
+	"city": "My City",
+	"state": "MS",
+	"zipCode": "11111",
+	"country": "USA"
+}
\ No newline at end of file