You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/08/12 05:02:11 UTC

[2/4] nifi git commit: NIFI-4142: This closes #2015. Refactored Record Reader/Writer to allow for reading/writing "raw records". Implemented ValidateRecord. Updated Record Reader to take two parameters for nextRecord: (boolean coerceTypes) and (boolean d

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
index b47bece..9a9fed7 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/util/MockRecordParser.java
@@ -33,6 +33,7 @@ import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.SchemaValidationException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -74,7 +75,7 @@ public class MockRecordParser extends AbstractControllerService implements Recor
             }
 
             @Override
-            public Record nextRecord() throws IOException, MalformedRecordException {
+            public Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException, SchemaValidationException {
                 if (failAfterN >= recordCount) {
                     throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
                 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
index 2be37df..5e6a9ca 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_inline.groovy
@@ -44,7 +44,7 @@ class GroovyRecordReader implements RecordReader {
             new MapRecord(recordSchema, ['id': 3, 'name': 'Ramon', 'code': 300])
     ].iterator()
 
-    Record nextRecord() throws IOException, MalformedRecordException {
+    Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
         return recordIterator.hasNext() ? recordIterator.next() : null
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
index d51089b..b94c380 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/src/test/resources/groovy/test_record_reader_xml.groovy
@@ -51,7 +51,7 @@ class GroovyXmlRecordReader implements RecordReader {
         }.iterator()
     }
 
-    Record nextRecord() throws IOException, MalformedRecordException {
+    Record nextRecord(boolean coerceTypes, boolean dropUnknown) throws IOException, MalformedRecordException {
         return recordIterator?.hasNext() ? recordIterator.next() : null
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 836e83d..19f47be 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -1,15 +1,16 @@
 <?xml version="1.0"?>
-<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor
-    license agreements. See the NOTICE file distributed with this work for additional
-    information regarding copyright ownership. The ASF licenses this file to
-    You under the Apache License, Version 2.0 (the "License"); you may not use
-    this file except in compliance with the License. You may obtain a copy of
-    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required
-    by applicable law or agreed to in writing, software distributed under the
-    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS
-    OF ANY KIND, either express or implied. See the License for the specific
+<!-- Licensed to the Apache Software Foundation (ASF) under one or more contributor 
+    license agreements. See the NOTICE file distributed with this work for additional 
+    information regarding copyright ownership. The ASF licenses this file to 
+    You under the Apache License, Version 2.0 (the "License"); you may not use 
+    this file except in compliance with the License. You may obtain a copy of 
+    the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required 
+    by applicable law or agreed to in writing, software distributed under the 
+    License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS 
+    OF ANY KIND, either express or implied. See the License for the specific 
     language governing permissions and limitations under the License. -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
     <parent>
         <groupId>org.apache.nifi</groupId>
@@ -356,6 +357,11 @@
             <artifactId>calcite-core</artifactId>
             <version>1.12.0</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-schema-registry-service-api</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -498,7 +504,8 @@
                         <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-record.avsc</exclude>
                         <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string.avsc</exclude>
                         <exclude>src/test/resources/TestUpdateRecord/schema/person-with-name-string-fields.avsc</exclude>
-                        <!-- This file is copied from https://github.com/jeremyh/jBCrypt because the binary is compiled for Java 8 and we must support Java 7 -->
+                        <!-- This file is copied from https://github.com/jeremyh/jBCrypt 
+                            because the binary is compiled for Java 8 and we must support Java 7 -->
                         <exclude>src/main/java/org/apache/nifi/security/util/crypto/bcrypt/BCrypt.java</exclude>
                     </excludes>
                 </configuration>

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
new file mode 100644
index 0000000..8e8fca8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/ValidateRecord.java
@@ -0,0 +1,457 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.processors.standard;
+
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Parser;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.validation.SchemaValidationContext;
+import org.apache.nifi.schema.validation.StandardSchemaValidator;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.RawRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.validation.RecordSchemaValidator;
+import org.apache.nifi.serialization.record.validation.SchemaValidationResult;
+import org.apache.nifi.serialization.record.validation.ValidationError;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@Tags({"record", "schema", "validate"})
+@CapabilityDescription("Validates the Records of an incoming FlowFile against a given schema. All records that adhere to the schema are routed to the \"valid\" relationship while "
+    + "records that do not adhere to hte schema are routed to the \"invalid\" relationship. It is therefore possible for a single incoming FlowFile to be split into two individual "
+    + "FlowFiles if some records are valid according to the schema and others are not. Any FlowFile that is routed to the \"invalid\" relationship will emit a ROUTE Provenance Event "
+    + "with the Details field populated to explain why records were invalid. In addition, to gain further explanation of why records were invalid, DEBUG-level logging can be enabled "
+    + "for the \"org.apache.nifi.processors.standard.ValidateRecord\" logger.")
+public class ValidateRecord extends AbstractProcessor {
+
+    static final AllowableValue SCHEMA_NAME_PROPERTY = new AllowableValue("schema-name-property", "Use Schema Name Property",
+        "The schema to validate the data against is determined by looking at the 'Schema Name' Property and looking up the schema in the configured Schema Registry");
+    static final AllowableValue SCHEMA_TEXT_PROPERTY = new AllowableValue("schema-text-property", "Use Schema Text Property",
+        "The schema to validate the data against is determined by looking at the 'Schema Text' Property and parsing the schema as an Avro schema");
+    static final AllowableValue READER_SCHEMA = new AllowableValue("reader-schema", "Use Reader's Schema",
+        "The schema to validate the data against is determined by asking the configured Record Reader for its schema");
+
+    static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
+        .name("record-reader")
+        .displayName("Record Reader")
+        .description("Specifies the Controller Service to use for reading incoming data")
+        .identifiesControllerService(RecordReaderFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+        .name("record-writer")
+        .displayName("Record Writer")
+        .description("Specifies the Controller Service to use for writing out the records")
+        .identifiesControllerService(RecordSetWriterFactory.class)
+        .required(true)
+        .build();
+    static final PropertyDescriptor SCHEMA_ACCESS_STRATEGY = new PropertyDescriptor.Builder()
+        .name("schema-access-strategy")
+        .displayName("Schema Access Strategy")
+        .description("Specifies how to obtain the schema that should be used to validate records")
+        .allowableValues(READER_SCHEMA, SCHEMA_NAME_PROPERTY, SCHEMA_TEXT_PROPERTY)
+        .defaultValue(READER_SCHEMA.getValue())
+        .required(true)
+        .build();
+    public static final PropertyDescriptor SCHEMA_REGISTRY = new PropertyDescriptor.Builder()
+        .name("schema-registry")
+        .displayName("Schema Registry")
+        .description("Specifies the Controller Service to use for the Schema Registry. This is necessary only if the Schema Access Strategy is set to \"Use 'Schema Name' Property\".")
+        .identifiesControllerService(SchemaRegistry.class)
+        .required(false)
+        .build();
+    static final PropertyDescriptor SCHEMA_NAME = new PropertyDescriptor.Builder()
+        .name("schema-name")
+        .displayName("Schema Name")
+        .description("Specifies the name of the schema to lookup in the Schema Registry property")
+        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .defaultValue("${schema.name}")
+        .required(false)
+        .build();
+    static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
+        .name("schema-text")
+        .displayName("Schema Text")
+        .description("The text of an Avro-formatted Schema")
+        .addValidator(new AvroSchemaValidator())
+        .expressionLanguageSupported(true)
+        .defaultValue("${avro.schema}")
+        .required(false)
+        .build();
+    static final PropertyDescriptor ALLOW_EXTRA_FIELDS = new PropertyDescriptor.Builder()
+        .name("allow-extra-fields")
+        .displayName("Allow Extra Fields")
+        .description("If the incoming data has fields that are not present in the schema, this property determines whether or not the Record is valid. "
+            + "If true, the Record is still valid. If false, the Record will be invalid due to the extra fields.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+    static final PropertyDescriptor STRICT_TYPE_CHECKING = new PropertyDescriptor.Builder()
+        .name("strict-type-checking")
+        .displayName("Strict Type Checking")
+        .description("If the incoming data has a Record where a field is not of the correct type, this property determine whether how to handle the Record. "
+            + "If true, the Record will still be considered invalid. If false, the Record will be considered valid and the field will be coerced into the "
+            + "correct type (if possible, according to the type coercion supported by the Record Writer).")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("true")
+        .required(true)
+        .build();
+
+    static final Relationship REL_VALID = new Relationship.Builder()
+        .name("valid")
+        .description("Records that are valid according to the schema will be routed to this relationship")
+        .build();
+    static final Relationship REL_INVALID = new Relationship.Builder()
+        .name("invalid")
+        .description("Records that are not valid according to the schema will be routed to this relationship")
+        .build();
+    static final Relationship REL_FAILURE = new Relationship.Builder()
+        .name("failure")
+        .description("If the records cannot be read, validated, or written, for any reason, the original FlowFile will be routed to this relationship")
+        .build();
+
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(RECORD_READER);
+        properties.add(RECORD_WRITER);
+        properties.add(SCHEMA_ACCESS_STRATEGY);
+        properties.add(SCHEMA_REGISTRY);
+        properties.add(SCHEMA_NAME);
+        properties.add(SCHEMA_TEXT);
+        properties.add(ALLOW_EXTRA_FIELDS);
+        properties.add(STRICT_TYPE_CHECKING);
+        return properties;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        final Set<Relationship> relationships = new HashSet<>();
+        relationships.add(REL_VALID);
+        relationships.add(REL_INVALID);
+        relationships.add(REL_FAILURE);
+        return relationships;
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final String schemaAccessStrategy = validationContext.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
+            if (!validationContext.getProperty(SCHEMA_REGISTRY).isSet()) {
+                return Collections.singleton(new ValidationResult.Builder()
+                    .subject("Schema Registry")
+                    .valid(false)
+                    .explanation("If the Schema Access Strategy is set to \"Use 'Schema Name' Property\", the Schema Registry property must also be set")
+                    .build());
+            }
+
+            final SchemaRegistry registry = validationContext.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+            if (!registry.getSuppliedSchemaFields().contains(SchemaField.SCHEMA_NAME)) {
+                return Collections.singleton(new ValidationResult.Builder()
+                    .subject("Schema Registry")
+                    .valid(false)
+                    .explanation("The configured Schema Registry does not support accessing schemas by name")
+                    .build());
+            }
+        }
+
+        return Collections.emptyList();
+    }
+
+
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+        final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
+
+        final boolean allowExtraFields = context.getProperty(ALLOW_EXTRA_FIELDS).asBoolean();
+        final boolean strictTypeChecking = context.getProperty(STRICT_TYPE_CHECKING).asBoolean();
+
+        RecordSetWriter validWriter = null;
+        RecordSetWriter invalidWriter = null;
+        FlowFile validFlowFile = null;
+        FlowFile invalidFlowFile = null;
+
+        try (final InputStream in = session.read(flowFile);
+            final RecordReader reader = readerFactory.createRecordReader(flowFile, in, getLogger())) {
+
+            final RecordSchema validationSchema = getValidationSchema(context, flowFile, reader);
+            final SchemaValidationContext validationContext = new SchemaValidationContext(validationSchema, allowExtraFields, strictTypeChecking);
+            final RecordSchemaValidator validator = new StandardSchemaValidator(validationContext);
+
+            int recordCount = 0;
+            int validCount = 0;
+            int invalidCount = 0;
+
+            final Set<String> extraFields = new HashSet<>();
+            final Set<String> missingFields = new HashSet<>();
+            final Set<String> invalidFields = new HashSet<>();
+            final Set<String> otherProblems = new HashSet<>();
+
+            try {
+                Record record;
+                while ((record = reader.nextRecord(false, false)) != null) {
+                    final SchemaValidationResult result = validator.validate(record);
+                    recordCount++;
+
+                    RecordSetWriter writer;
+                    if (result.isValid()) {
+                        validCount++;
+                        if (validFlowFile == null) {
+                            validFlowFile = session.create(flowFile);
+                        }
+
+                        validWriter = writer = createIfNecessary(validWriter, writerFactory, session, validFlowFile, record.getSchema());
+                    } else {
+                        invalidCount++;
+                        logValidationErrors(flowFile, recordCount, result);
+
+                        if (invalidFlowFile == null) {
+                            invalidFlowFile = session.create(flowFile);
+                        }
+
+                        invalidWriter = writer = createIfNecessary(invalidWriter, writerFactory, session, invalidFlowFile, record.getSchema());
+
+                        // Add all of the validation errors to our Set<ValidationError> but only keep up to MAX_VALIDATION_ERRORS because if
+                        // we keep too many then we both use up a lot of heap and risk outputting so much information in the Provenance Event
+                        // that it is too noisy to be useful.
+                        for (final ValidationError validationError : result.getValidationErrors()) {
+                            final Optional<String> fieldName = validationError.getFieldName();
+
+                            switch (validationError.getType()) {
+                                case EXTRA_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        extraFields.add(fieldName.get());
+                                    } else {
+                                        otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case MISSING_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        missingFields.add(fieldName.get());
+                                    } else {
+                                        otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case INVALID_FIELD:
+                                    if (fieldName.isPresent()) {
+                                        invalidFields.add(fieldName.get());
+                                    } else {
+                                        otherProblems.add(validationError.getExplanation());
+                                    }
+                                    break;
+                                case OTHER:
+                                    otherProblems.add(validationError.getExplanation());
+                                    break;
+                            }
+                        }
+                    }
+
+                    if (writer instanceof RawRecordWriter) {
+                        ((RawRecordWriter) writer).writeRawRecord(record);
+                    } else {
+                        writer.write(record);
+                    }
+                }
+
+                if (validWriter != null) {
+                    completeFlowFile(session, validFlowFile, validWriter, REL_VALID, null);
+                }
+
+                if (invalidWriter != null) {
+                    // Build up a String that explains why the records were invalid, so that we can add this to the Provenance Event.
+                    final StringBuilder errorBuilder = new StringBuilder();
+                    errorBuilder.append("Records in this FlowFile were invalid for the following reasons: ");
+                    if (!missingFields.isEmpty()) {
+                        errorBuilder.append("The following ").append(missingFields.size()).append(" fields were missing: ").append(missingFields.toString());
+                    }
+
+                    if (!extraFields.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following ").append(extraFields.size())
+                            .append(" fields were present in the Record but not in the schema: ").append(extraFields.toString());
+                    }
+
+                    if (!invalidFields.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following ").append(invalidFields.size())
+                            .append(" fields had values whose type did not match the schema: ").append(invalidFields.toString());
+                    }
+
+                    if (!otherProblems.isEmpty()) {
+                        if (errorBuilder.length() > 0) {
+                            errorBuilder.append("; ");
+                        }
+
+                        errorBuilder.append("The following ").append(otherProblems.size())
+                            .append(" additional problems were encountered: ").append(otherProblems.toString());
+                    }
+
+                    final String validationErrorString = errorBuilder.toString();
+                    completeFlowFile(session, invalidFlowFile, invalidWriter, REL_INVALID, validationErrorString);
+                }
+            } finally {
+                closeQuietly(validWriter);
+                closeQuietly(invalidWriter);
+            }
+
+            session.adjustCounter("Records Validated", recordCount, false);
+            session.adjustCounter("Records Found Valid", validCount, false);
+            session.adjustCounter("Records Found Invalid", invalidCount, false);
+        } catch (final IOException | MalformedRecordException | SchemaNotFoundException e) {
+            getLogger().error("Failed to process {}; will route to failure", new Object[] {flowFile, e});
+            session.transfer(flowFile, REL_FAILURE);
+            if (validFlowFile != null) {
+                session.remove(validFlowFile);
+            }
+            if (invalidFlowFile != null) {
+                session.remove(invalidFlowFile);
+            }
+            return;
+        }
+
+        session.remove(flowFile);
+    }
+
+    private void closeQuietly(final RecordSetWriter writer) {
+        if (writer != null) {
+            try {
+                writer.close();
+            } catch (final Exception e) {
+                getLogger().error("Failed to close Record Writer", e);
+            }
+        }
+    }
+
+    private void completeFlowFile(final ProcessSession session, final FlowFile flowFile, final RecordSetWriter writer, final Relationship relationship, final String details) throws IOException {
+        final WriteResult writeResult = writer.finishRecordSet();
+        writer.close();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.putAll(writeResult.getAttributes());
+        attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+        attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+        session.putAllAttributes(flowFile, attributes);
+
+        session.transfer(flowFile, relationship);
+        session.getProvenanceReporter().route(flowFile, relationship, details);
+    }
+
+    private RecordSetWriter createIfNecessary(final RecordSetWriter writer, final RecordSetWriterFactory factory, final ProcessSession session,
+        final FlowFile flowFile, final RecordSchema inputSchema) throws SchemaNotFoundException, IOException {
+        if (writer != null) {
+            return writer;
+        }
+
+        final OutputStream out = session.write(flowFile);
+        final RecordSetWriter created = factory.createWriter(getLogger(), inputSchema, flowFile, out);
+        created.beginRecordSet();
+        return created;
+    }
+
+    private void logValidationErrors(final FlowFile flowFile, final int recordCount, final SchemaValidationResult result) {
+        if (getLogger().isDebugEnabled()) {
+            final StringBuilder sb = new StringBuilder();
+            sb.append("For ").append(flowFile).append(" Record #").append(recordCount).append(" is invalid due to:\n");
+            for (final ValidationError error : result.getValidationErrors()) {
+                sb.append(error).append("\n");
+            }
+
+            getLogger().debug(sb.toString());
+        }
+    }
+
+    protected RecordSchema getValidationSchema(final ProcessContext context, final FlowFile flowFile, final RecordReader reader)
+        throws MalformedRecordException, IOException, SchemaNotFoundException {
+        final String schemaAccessStrategy = context.getProperty(SCHEMA_ACCESS_STRATEGY).getValue();
+        if (schemaAccessStrategy.equals(READER_SCHEMA.getValue())) {
+            return reader.getSchema();
+        } else if (schemaAccessStrategy.equals(SCHEMA_NAME_PROPERTY.getValue())) {
+            final SchemaRegistry schemaRegistry = context.getProperty(SCHEMA_REGISTRY).asControllerService(SchemaRegistry.class);
+            final String schemaName = context.getProperty(SCHEMA_NAME).evaluateAttributeExpressions(flowFile).getValue();
+            return schemaRegistry.retrieveSchema(schemaName);
+        } else if (schemaAccessStrategy.equals(SCHEMA_TEXT_PROPERTY.getValue())) {
+            final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions(flowFile).getValue();
+            final Parser parser = new Schema.Parser();
+            final Schema avroSchema = parser.parse(schemaText);
+            return AvroTypeUtil.createSchema(avroSchema);
+        } else {
+            throw new ProcessException("Invalid Schema Access Strategy: " + schemaAccessStrategy);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index b8eb4a1..f3e72e0 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -102,6 +102,7 @@ org.apache.nifi.processors.standard.TransformXml
 org.apache.nifi.processors.standard.UnpackContent
 org.apache.nifi.processors.standard.ValidateXml
 org.apache.nifi.processors.standard.ValidateCsv
+org.apache.nifi.processors.standard.ValidateRecord
 org.apache.nifi.processors.standard.Wait
 org.apache.nifi.processors.standard.ExecuteSQL
 org.apache.nifi.processors.standard.FetchDistributedMapCache

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
index 13a8317..c1d87b6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -32,7 +32,7 @@ public abstract class AvroRecordReader implements RecordReader {
     protected abstract GenericRecord nextAvroRecord() throws IOException;
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         GenericRecord record = nextAvroRecord();
         if (record == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 1528052..135dd80 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -34,6 +34,7 @@ import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.DateTimeUtils;
@@ -57,6 +58,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     private volatile String dateFormat;
     private volatile String timeFormat;
     private volatile String timestampFormat;
+    private volatile boolean firstLineIsHeader;
+    private volatile boolean ignoreHeader;
 
 
     @Override
@@ -67,7 +70,8 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
         properties.add(CSVUtils.CSV_FORMAT);
         properties.add(CSVUtils.VALUE_SEPARATOR);
-        properties.add(CSVUtils.SKIP_HEADER_LINE);
+        properties.add(CSVUtils.FIRST_LINE_IS_HEADER);
+        properties.add(CSVUtils.IGNORE_CSV_HEADER);
         properties.add(CSVUtils.QUOTE_CHAR);
         properties.add(CSVUtils.ESCAPE_CHAR);
         properties.add(CSVUtils.COMMENT_MARKER);
@@ -82,6 +86,16 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
         this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
         this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+        this.firstLineIsHeader = context.getProperty(CSVUtils.FIRST_LINE_IS_HEADER).asBoolean();
+        this.ignoreHeader = context.getProperty(CSVUtils.IGNORE_CSV_HEADER).asBoolean();
+
+        // Ensure that if we are deriving schema from header that we always treat the first line as a header,
+        // regardless of the 'First Line is Header' property
+        final String accessStrategy = context.getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
+        if (headerDerivedAllowableValue.getValue().equals(accessStrategy)) {
+            this.csvFormat = this.csvFormat.withFirstRecordAsHeader();
+            this.firstLineIsHeader = true;
+        }
     }
 
     @Override
@@ -92,7 +106,7 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         final RecordSchema schema = getSchema(flowFile, new NonCloseableInputStream(bufferedIn), null);
         bufferedIn.reset();
 
-        return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, dateFormat, timeFormat, timestampFormat);
+        return new CSVRecordReader(bufferedIn, logger, schema, csvFormat, firstLineIsHeader, ignoreHeader, dateFormat, timeFormat, timestampFormat);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
index f193fba..18bea6b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -22,8 +22,13 @@ import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.Reader;
 import java.text.DateFormat;
-import java.util.HashMap;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.function.Supplier;
 
 import org.apache.commons.csv.CSVFormat;
@@ -36,7 +41,6 @@ import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 
@@ -49,7 +53,9 @@ public class CSVRecordReader implements RecordReader {
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
-    public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat,
+    private List<String> rawFieldNames;
+
+    public CSVRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema, final CSVFormat csvFormat, final boolean hasHeader, final boolean ignoreHeader,
         final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
 
         this.schema = schema;
@@ -62,48 +68,78 @@ public class CSVRecordReader implements RecordReader {
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
 
         final Reader reader = new InputStreamReader(new BOMInputStream(in));
-        final CSVFormat withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
+
+        CSVFormat withHeader;
+        if (hasHeader) {
+            withHeader = csvFormat.withSkipHeaderRecord();
+
+            if (ignoreHeader) {
+                withHeader = withHeader.withHeader(schema.getFieldNames().toArray(new String[0]));
+            }
+        } else {
+            withHeader = csvFormat.withHeader(schema.getFieldNames().toArray(new String[0]));
+        }
+
         csvParser = new CSVParser(reader, withHeader);
     }
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         final RecordSchema schema = getSchema();
 
+        final List<String> rawFieldNames = getRawFieldNames();
+        final int numFieldNames = rawFieldNames.size();
+
         for (final CSVRecord csvRecord : csvParser) {
-            final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount());
+            final Map<String, Object> values = new LinkedHashMap<>();
+            for (int i = 0; i < csvRecord.size(); i++) {
+                final String rawFieldName = numFieldNames <= i ? "unknown_field_index_" + i : rawFieldNames.get(i);
+                final String rawValue = csvRecord.get(i);
 
-            for (final RecordField recordField : schema.getFields()) {
-                String rawValue = null;
-                final String fieldName = recordField.getFieldName();
-                if (csvRecord.isSet(fieldName)) {
-                    rawValue = csvRecord.get(fieldName);
-                } else {
-                    for (final String alias : recordField.getAliases()) {
-                        if (csvRecord.isSet(alias)) {
-                            rawValue = csvRecord.get(alias);
-                            break;
-                        }
-                    }
-                }
+                final Optional<DataType> dataTypeOption = schema.getDataType(rawFieldName);
 
-                if (rawValue == null) {
-                    rowValues.put(fieldName, null);
+                if (!dataTypeOption.isPresent() && dropUnknownFields) {
                     continue;
                 }
 
-                final Object converted = convert(rawValue, recordField.getDataType(), fieldName);
-                if (converted != null) {
-                    rowValues.put(fieldName, converted);
+                final Object value;
+                if (coerceTypes && dataTypeOption.isPresent()) {
+                    value = convert(rawValue, dataTypeOption.get(), rawFieldName);
+                } else if (dataTypeOption.isPresent()) {
+                    // The CSV Reader is going to return all fields as Strings, because CSV doesn't have any way to
+                    // dictate a field type. As a result, we will use the schema that we have to attempt to convert
+                    // the value into the desired type if it's a simple type.
+                    value = convertSimpleIfPossible(rawValue, dataTypeOption.get(), rawFieldName);
+                } else {
+                    value = rawValue;
                 }
+
+                values.put(rawFieldName, value);
             }
 
-            return new MapRecord(schema, rowValues);
+            return new MapRecord(schema, values, coerceTypes, dropUnknownFields);
         }
 
         return null;
     }
 
+
+    private List<String> getRawFieldNames() {
+        if (this.rawFieldNames != null) {
+            return this.rawFieldNames;
+        }
+
+        // Use a SortedMap keyed by index of the field so that we can get a List of field names in the correct order
+        final SortedMap<Integer, String> sortedMap = new TreeMap<>();
+        for (final Map.Entry<String, Integer> entry : csvParser.getHeaderMap().entrySet()) {
+            sortedMap.put(entry.getValue(), entry.getKey());
+        }
+
+        this.rawFieldNames = new ArrayList<>(sortedMap.values());
+        return this.rawFieldNames;
+    }
+
+
     @Override
     public RecordSchema getSchema() {
         return schema;
@@ -115,7 +151,6 @@ public class CSVRecordReader implements RecordReader {
         }
 
         final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
-
         if (trimmed.isEmpty()) {
             return null;
         }
@@ -123,6 +158,40 @@ public class CSVRecordReader implements RecordReader {
         return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
     }
 
+    private Object convertSimpleIfPossible(final String value, final DataType dataType, final String fieldName) {
+        if (dataType == null || value == null) {
+            return value;
+        }
+
+        final String trimmed = value.startsWith("\"") && value.endsWith("\"") ? value.substring(1, value.length() - 1) : value;
+        if (trimmed.isEmpty()) {
+            return null;
+        }
+
+        switch (dataType.getFieldType()) {
+            case STRING:
+                return value;
+            case BOOLEAN:
+            case INT:
+            case LONG:
+            case FLOAT:
+            case DOUBLE:
+            case BYTE:
+            case CHAR:
+            case SHORT:
+            case TIME:
+            case TIMESTAMP:
+            case DATE:
+                if (DataTypeUtils.isCompatibleDataType(trimmed, dataType)) {
+                    return DataTypeUtils.convertType(trimmed, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                } else {
+                    return value;
+                }
+        }
+
+        return value;
+    }
+
     @Override
     public void close() throws IOException {
         csvParser.close();

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
index 14cd60d..99f877f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVUtils.java
@@ -61,7 +61,7 @@ public class CSVUtils {
         .defaultValue("\"")
         .required(true)
         .build();
-    static final PropertyDescriptor SKIP_HEADER_LINE = new PropertyDescriptor.Builder()
+    static final PropertyDescriptor FIRST_LINE_IS_HEADER = new PropertyDescriptor.Builder()
         .name("Skip Header Line")
         .displayName("Treat First Line as Header")
         .description("Specifies whether or not the first line of CSV should be considered a Header or should be considered a record. If the Schema Access Strategy "
@@ -74,6 +74,18 @@ public class CSVUtils {
         .defaultValue("false")
         .required(true)
         .build();
+    static final PropertyDescriptor IGNORE_CSV_HEADER = new PropertyDescriptor.Builder()
+        .name("ignore-csv-header")
+        .displayName("Ignore CSV Header Column Names")
+        .description("If the first line of a CSV is a header, and the configured schema does not match the fields named in the header line, this controls how "
+            + "the Reader will interpret the fields. If this property is true, then the field names mapped to each column are driven only by the configured schema and "
+            + "any fields not in the schema will be ignored. If this property is false, then the field names found in the CSV Header will be used as the names of the "
+            + "fields.")
+        .expressionLanguageSupported(false)
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(false)
+        .build();
     static final PropertyDescriptor COMMENT_MARKER = new PropertyDescriptor.Builder()
         .name("Comment Marker")
         .description("The character that is used to denote the start of a comment. Any line that begins with this comment will be ignored.")
@@ -177,7 +189,7 @@ public class CSVUtils {
             .withAllowMissingColumnNames()
             .withIgnoreEmptyLines();
 
-        final PropertyValue skipHeaderPropertyValue = context.getProperty(SKIP_HEADER_LINE);
+        final PropertyValue skipHeaderPropertyValue = context.getProperty(FIRST_LINE_IS_HEADER);
         if (skipHeaderPropertyValue.getValue() != null && skipHeaderPropertyValue.asBoolean()) {
             format = format.withFirstRecordAsHeader();
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
index 00270ed..82d687a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -20,19 +20,24 @@ package org.apache.nifi.csv;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.OutputStreamWriter;
+import java.util.LinkedHashSet;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
 
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVPrinter;
 import org.apache.nifi.schema.access.SchemaAccessWriter;
 import org.apache.nifi.serialization.AbstractRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
 import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RawRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordSchema;
 
-public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter {
+public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSetWriter, RawRecordWriter {
     private final RecordSchema recordSchema;
     private final SchemaAccessWriter schemaWriter;
     private final String dateFormat;
@@ -40,6 +45,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
     private final String timestampFormat;
     private final CSVPrinter printer;
     private final Object[] fieldValues;
+    private final boolean includeHeaderLine;
+    private boolean headerWritten = false;
+    private String[] fieldNames;
 
     public WriteCSVResult(final CSVFormat csvFormat, final RecordSchema recordSchema, final SchemaAccessWriter schemaWriter, final OutputStream out,
         final String dateFormat, final String timeFormat, final String timestampFormat, final boolean includeHeaderLine) throws IOException {
@@ -50,9 +58,9 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
         this.dateFormat = dateFormat;
         this.timeFormat = timeFormat;
         this.timestampFormat = timestampFormat;
+        this.includeHeaderLine = includeHeaderLine;
 
-        final String[] columnNames = recordSchema.getFieldNames().toArray(new String[0]);
-        final CSVFormat formatWithHeader = csvFormat.withHeader(columnNames).withSkipHeaderRecord(!includeHeaderLine);
+        final CSVFormat formatWithHeader = csvFormat.withSkipHeaderRecord(true);
         final OutputStreamWriter streamWriter = new OutputStreamWriter(out);
         printer = new CSVPrinter(streamWriter, formatWithHeader);
 
@@ -93,6 +101,34 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
         printer.flush();
     }
 
+    private String[] getFieldNames(final Record record) {
+        if (fieldNames != null) {
+            return fieldNames;
+        }
+
+        final Set<String> allFields = new LinkedHashSet<>();
+        allFields.addAll(record.getRawFieldNames());
+        allFields.addAll(recordSchema.getFieldNames());
+        fieldNames = allFields.toArray(new String[0]);
+        return fieldNames;
+    }
+
+    private void includeHeaderIfNecessary(final Record record, final boolean includeOnlySchemaFields) throws IOException {
+        if (headerWritten || !includeHeaderLine) {
+            return;
+        }
+
+        final Object[] fieldNames;
+        if (includeOnlySchemaFields) {
+            fieldNames = recordSchema.getFieldNames().toArray(new Object[0]);
+        } else {
+            fieldNames = getFieldNames(record);
+        }
+
+        printer.printRecord(fieldNames);
+        headerWritten = true;
+    }
+
     @Override
     public Map<String, String> writeRecord(final Record record) throws IOException {
         // If we are not writing an active record set, then we need to ensure that we write the
@@ -101,6 +137,8 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
             schemaWriter.writeHeader(recordSchema, getOutputStream());
         }
 
+        includeHeaderIfNecessary(record, true);
+
         int i = 0;
         for (final RecordField recordField : recordSchema.getFields()) {
             fieldValues[i++] = record.getAsString(recordField, getFormat(recordField));
@@ -111,6 +149,36 @@ public class WriteCSVResult extends AbstractRecordSetWriter implements RecordSet
     }
 
     @Override
+    public WriteResult writeRawRecord(final Record record) throws IOException {
+        // If we are not writing an active record set, then we need to ensure that we write the
+        // schema information.
+        if (!isActiveRecordSet()) {
+            schemaWriter.writeHeader(recordSchema, getOutputStream());
+        }
+
+        includeHeaderIfNecessary(record, false);
+
+        final String[] fieldNames = getFieldNames(record);
+        // Avoid creating a new Object[] for every Record if we can. But if the record has a different number of columns than does our
+        // schema, we don't have a lot of options here, so we just create a new Object[] in that case.
+        final Object[] recordFieldValues = (fieldNames.length == this.fieldValues.length) ? this.fieldValues : new String[fieldNames.length];
+
+        int i = 0;
+        for (final String fieldName : fieldNames) {
+            final Optional<RecordField> recordField = recordSchema.getField(fieldName);
+            if (recordField.isPresent()) {
+                recordFieldValues[i++] = record.getAsString(fieldName, getFormat(recordField.get()));
+            } else {
+                recordFieldValues[i++] = record.getAsString(fieldName);
+            }
+        }
+
+        printer.printRecord(recordFieldValues);
+        final Map<String, String> attributes = schemaWriter.getAttributes(recordSchema);
+        return WriteResult.of(incrementRecordCount(), attributes);
+    }
+
+    @Override
     public String getMimeType() {
         return "text/csv";
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index fae3eba..54a2333 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -67,6 +67,7 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     private volatile Grok grok;
     private volatile boolean appendUnmatchedLine;
     private volatile RecordSchema recordSchema;
+    private volatile RecordSchema recordSchemaFromGrok;
 
     private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
 
@@ -133,9 +134,11 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
 
         appendUnmatchedLine = context.getProperty(NO_MATCH_BEHAVIOR).getValue().equalsIgnoreCase(APPEND_TO_PREVIOUS_MESSAGE.getValue());
 
+        this.recordSchemaFromGrok = createRecordSchema(grok);
+
         final String schemaAccess = context.getProperty(getSchemaAcessStrategyDescriptor()).getValue();
         if (STRING_FIELDS_FROM_GROK_EXPRESSION.getValue().equals(schemaAccess)) {
-            this.recordSchema = createRecordSchema(grok);
+            this.recordSchema = recordSchemaFromGrok;
         } else {
             this.recordSchema = null;
         }
@@ -236,6 +239,6 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
         final RecordSchema schema = getSchema(flowFile, in, null);
-        return new GrokRecordReader(in, grok, schema, appendUnmatchedLine);
+        return new GrokRecordReader(in, grok, schema, recordSchemaFromGrok, appendUnmatchedLine);
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
index e668b1c..65edf05 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -24,6 +24,7 @@ import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.regex.Pattern;
 
 import org.apache.nifi.serialization.MalformedRecordException;
@@ -43,6 +44,7 @@ public class GrokRecordReader implements RecordReader {
     private final BufferedReader reader;
     private final Grok grok;
     private final boolean append;
+    private final RecordSchema schemaFromGrok;
     private RecordSchema schema;
 
     private String nextLine;
@@ -55,11 +57,12 @@ public class GrokRecordReader implements RecordReader {
             + "(?:Suppressed\\: )|"
             + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
 
-    public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final boolean append) {
+    public GrokRecordReader(final InputStream in, final Grok grok, final RecordSchema schema, final RecordSchema schemaFromGrok, final boolean append) {
         this.reader = new BufferedReader(new InputStreamReader(in));
         this.grok = grok;
         this.schema = schema;
         this.append = append;
+        this.schemaFromGrok = schemaFromGrok;
     }
 
     @Override
@@ -68,7 +71,7 @@ public class GrokRecordReader implements RecordReader {
     }
 
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         Map<String, Object> valueMap = null;
         while (valueMap == null || valueMap.isEmpty()) {
             final String line = nextLine == null ? reader.readLine() : nextLine;
@@ -85,7 +88,7 @@ public class GrokRecordReader implements RecordReader {
         // Read the next line to see if it matches the pattern (in which case we will simply leave it for
         // the next call to nextRecord()) or we will attach it to the previously read record.
         String stackTrace = null;
-        final StringBuilder toAppend = new StringBuilder();
+        final StringBuilder trailingText = new StringBuilder();
         while ((nextLine = reader.readLine()) != null) {
             final Match nextLineMatch = grok.match(nextLine);
             nextLineMatch.captures();
@@ -97,7 +100,7 @@ public class GrokRecordReader implements RecordReader {
                     stackTrace = readStackTrace(nextLine);
                     break;
                 } else if (append) {
-                    toAppend.append("\n").append(nextLine);
+                    trailingText.append("\n").append(nextLine);
                 }
             } else {
                 // The next line matched our pattern.
@@ -105,49 +108,78 @@ public class GrokRecordReader implements RecordReader {
             }
         }
 
-        try {
-            final List<DataType> fieldTypes = schema.getDataTypes();
-            final Map<String, Object> values = new HashMap<>(fieldTypes.size());
+        final Record record = createRecord(valueMap, trailingText, stackTrace, coerceTypes, dropUnknownFields);
+        return record;
+    }
 
-            for (final RecordField field : schema.getFields()) {
-                Object value = valueMap.get(field.getFieldName());
-                if (value == null) {
-                    for (final String alias : field.getAliases()) {
-                        value = valueMap.get(alias);
-                        if (value != null) {
-                            break;
-                        }
-                    }
+    private Record createRecord(final Map<String, Object> valueMap, final StringBuilder trailingText, final String stackTrace, final boolean coerceTypes, final boolean dropUnknown) {
+        final Map<String, Object> converted = new HashMap<>();
+        for (final Map.Entry<String, Object> entry : valueMap.entrySet()) {
+            final String fieldName = entry.getKey();
+            final Object rawValue = entry.getValue();
+
+            final Object normalizedValue;
+            if (rawValue instanceof List) {
+                final List<?> list = (List<?>) rawValue;
+                final String[] array = new String[list.size()];
+                for (int i = 0; i < list.size(); i++) {
+                    final Object rawObject = list.get(i);
+                    array[i] = rawObject == null ? null : rawObject.toString();
                 }
+                normalizedValue = array;
+            } else {
+                normalizedValue = rawValue == null ? null : rawValue.toString();
+            }
 
-                final String fieldName = field.getFieldName();
-                if (value == null) {
-                    values.put(fieldName, null);
-                    continue;
-                }
+            final Optional<RecordField> optionalRecordField = schema.getField(fieldName);
 
+            final Object coercedValue;
+            if (coerceTypes && optionalRecordField.isPresent()) {
+                final RecordField field = optionalRecordField.get();
                 final DataType fieldType = field.getDataType();
-                final Object converted = convert(fieldType, value.toString(), fieldName);
-                values.put(fieldName, converted);
+                coercedValue = convert(fieldType, normalizedValue, fieldName);
+            } else {
+                coercedValue = normalizedValue;
             }
 
-            if (append && toAppend.length() > 0) {
-                final String lastFieldName = schema.getField(schema.getFieldCount() - 1).getFieldName();
+            converted.put(fieldName, coercedValue);
+        }
 
-                final int fieldIndex = STACK_TRACE_COLUMN_NAME.equals(lastFieldName) ? schema.getFieldCount() - 2 : schema.getFieldCount() - 1;
-                final String lastFieldBeforeStackTrace = schema.getFieldNames().get(fieldIndex);
+        // If there is any trailing text, determine the last column from the grok schema
+        // and then append the trailing text to it.
+        if (append && trailingText.length() > 0) {
+            String lastPopulatedFieldName = null;
+            final List<RecordField> schemaFields = schemaFromGrok.getFields();
+            for (int i = schemaFields.size() - 1; i >= 0; i--) {
+                final RecordField field = schemaFields.get(i);
+
+                Object value = converted.get(field.getFieldName());
+                if (value != null) {
+                    lastPopulatedFieldName = field.getFieldName();
+                    break;
+                }
 
-                final Object existingValue = values.get(lastFieldBeforeStackTrace);
-                final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString();
-                values.put(lastFieldBeforeStackTrace, updatedValue);
+                for (final String alias : field.getAliases()) {
+                    value = converted.get(alias);
+                    if (value != null) {
+                        lastPopulatedFieldName = alias;
+                        break;
+                    }
+                }
             }
 
-            values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
-
-            return new MapRecord(schema, values);
-        } catch (final Exception e) {
-            throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + nextLine, e);
+            if (lastPopulatedFieldName != null) {
+                final Object value = converted.get(lastPopulatedFieldName);
+                if (value == null) {
+                    converted.put(lastPopulatedFieldName, trailingText.toString());
+                } else if (value instanceof String) { // if not a String it is a List and we will just drop the trailing text
+                    converted.put(lastPopulatedFieldName, (String) value + trailingText.toString());
+                }
+            }
         }
+
+        converted.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+        return new MapRecord(schema, converted);
     }
 
 
@@ -200,22 +232,23 @@ public class GrokRecordReader implements RecordReader {
     }
 
 
-    protected Object convert(final DataType fieldType, final String string, final String fieldName) {
+    protected Object convert(final DataType fieldType, final Object rawValue, final String fieldName) {
         if (fieldType == null) {
-            return string;
+            return rawValue;
         }
 
-        if (string == null) {
+        if (rawValue == null) {
             return null;
         }
 
         // If string is empty then return an empty string if field type is STRING. If field type is
         // anything else, we can't really convert it so return null
-        if (string.isEmpty() && fieldType.getFieldType() != RecordFieldType.STRING) {
+        final boolean fieldEmpty = rawValue instanceof String && ((String) rawValue).isEmpty();
+        if (fieldEmpty && fieldType.getFieldType() != RecordFieldType.STRING) {
             return null;
         }
 
-        return DataTypeUtils.convertType(string, fieldType, fieldName);
+        return DataTypeUtils.convertType(rawValue, fieldType, fieldName);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 1f61f17..663b837 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -19,19 +19,30 @@ package org.apache.nifi.json;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
 import java.util.Optional;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.JsonParseException;
 import org.codehaus.jackson.JsonParser;
 import org.codehaus.jackson.JsonToken;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
@@ -70,8 +81,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
     }
 
+
     @Override
-    public Record nextRecord() throws IOException, MalformedRecordException {
+    public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
         if (firstObjectConsumed && !array) {
             return null;
         }
@@ -79,7 +91,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         final JsonNode nextNode = getNextJsonNode();
         final RecordSchema schema = getSchema();
         try {
-            return convertJsonNodeToRecord(nextNode, schema);
+            return convertJsonNodeToRecord(nextNode, schema, coerceTypes, dropUnknownFields);
         } catch (final MalformedRecordException mre) {
             throw mre;
         } catch (final IOException ioe) {
@@ -91,7 +103,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
     }
 
     protected Object getRawNodeValue(final JsonNode fieldNode) throws IOException {
-        if (fieldNode == null || !fieldNode.isValueNode()) {
+        return getRawNodeValue(fieldNode, null);
+    }
+
+    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
+        if (fieldNode == null || fieldNode.isNull()) {
             return null;
         }
 
@@ -111,6 +127,53 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
             return fieldNode.getTextValue();
         }
 
+        if (fieldNode.isArray()) {
+            final ArrayNode arrayNode = (ArrayNode) fieldNode;
+            final int numElements = arrayNode.size();
+            final Object[] arrayElements = new Object[numElements];
+            int count = 0;
+
+            final DataType elementDataType;
+            if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
+                final ArrayDataType arrayDataType = (ArrayDataType) dataType;
+                elementDataType = arrayDataType.getElementType();
+            } else {
+                elementDataType = null;
+            }
+
+            for (final JsonNode node : arrayNode) {
+                final Object value = getRawNodeValue(node, elementDataType);
+                arrayElements[count++] = value;
+            }
+
+            return arrayElements;
+        }
+
+        if (fieldNode.isObject()) {
+            RecordSchema childSchema;
+            if (dataType != null && RecordFieldType.RECORD == dataType.getFieldType()) {
+                final RecordDataType recordDataType = (RecordDataType) dataType;
+                childSchema = recordDataType.getChildSchema();
+            } else {
+                childSchema = null;
+            }
+
+            if (childSchema == null) {
+                childSchema = new SimpleRecordSchema(Collections.emptyList());
+            }
+
+            final Iterator<String> fieldNames = fieldNode.getFieldNames();
+            final Map<String, Object> childValues = new HashMap<>();
+            while (fieldNames.hasNext()) {
+                final String childFieldName = fieldNames.next();
+                final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), dataType);
+                childValues.put(childFieldName, childValue);
+            }
+
+            final MapRecord record = new MapRecord(childSchema, childValues);
+            return record;
+        }
+
         return null;
     }
 
@@ -159,5 +222,5 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         return Optional.ofNullable(firstJsonNode);
     }
 
-    protected abstract Record convertJsonNodeToRecord(final JsonNode nextNode, final RecordSchema schema) throws IOException, MalformedRecordException;
+    protected abstract Record convertJsonNodeToRecord(JsonNode nextNode, RecordSchema schema, boolean coerceTypes, boolean dropUnknownFields) throws IOException, MalformedRecordException;
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/451f9cf1/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 2110bbb..90ab799 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -20,6 +20,7 @@ package org.apache.nifi.json;
 import java.io.IOException;
 import java.io.InputStream;
 import java.text.DateFormat;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -29,6 +30,7 @@ import java.util.function.Supplier;
 
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.SimpleRecordSchema;
 import org.apache.nifi.serialization.record.DataType;
 import org.apache.nifi.serialization.record.MapRecord;
 import org.apache.nifi.serialization.record.Record;
@@ -89,7 +91,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     }
 
     @Override
-    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema) throws IOException {
+    protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields) throws IOException {
         if (jsonNode == null) {
             return null;
         }
@@ -100,7 +102,8 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
         for (final Map.Entry<String, JsonPath> entry : jsonPaths.entrySet()) {
             final String fieldName = entry.getKey();
             final DataType desiredType = schema.getDataType(fieldName).orElse(null);
-            if (desiredType == null) {
+
+            if (desiredType == null && dropUnknownFields) {
                 continue;
             }
 
@@ -117,7 +120,13 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
             final Optional<RecordField> field = schema.getField(fieldName);
             final Object defaultValue = field.isPresent() ? field.get().getDefaultValue() : null;
 
-            value = convert(value, desiredType, fieldName, defaultValue);
+            if (coerceTypes && desiredType != null) {
+                value = convert(value, desiredType, fieldName, defaultValue);
+            } else {
+                final DataType dataType = field.isPresent() ? field.get().getDataType() : null;
+                value = convert(value, dataType);
+            }
+
             values.put(fieldName, value);
         }
 
@@ -126,6 +135,70 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
 
 
     @SuppressWarnings("unchecked")
+    protected Object convert(final Object value, final DataType dataType) {
+        if (value == null) {
+            return null;
+        }
+
+        if (value instanceof List) {
+            final List<?> list = (List<?>) value;
+            final Object[] array = new Object[list.size()];
+
+            final DataType elementDataType;
+            if (dataType != null && dataType.getFieldType() == RecordFieldType.ARRAY) {
+                elementDataType = ((ArrayDataType) dataType).getElementType();
+            } else {
+                elementDataType = null;
+            }
+
+            int i = 0;
+            for (final Object val : list) {
+                array[i++] = convert(val, elementDataType);
+            }
+
+            return array;
+        }
+
+        if (value instanceof Map) {
+            final Map<String, ?> map = (Map<String, ?>) value;
+
+            boolean record = false;
+            for (final Object obj : map.values()) {
+                if (obj instanceof JsonNode) {
+                    record = true;
+                }
+            }
+
+            if (!record) {
+                return value;
+            }
+
+            RecordSchema childSchema = null;
+            if (dataType != null && dataType.getFieldType() == RecordFieldType.RECORD) {
+                childSchema = ((RecordDataType) dataType).getChildSchema();
+            }
+            if (childSchema == null) {
+                childSchema = new SimpleRecordSchema(Collections.emptyList());
+            }
+
+            final Map<String, Object> values = new HashMap<>();
+            for (final Map.Entry<String, ?> entry : map.entrySet()) {
+                final String key = entry.getKey();
+                final Object childValue = entry.getValue();
+
+                final RecordField recordField = childSchema.getField(key).orElse(null);
+                final DataType childDataType = recordField == null ? null : recordField.getDataType();
+
+                values.put(key, convert(childValue, childDataType));
+            }
+
+            return new MapRecord(childSchema, values);
+        }
+
+        return value;
+    }
+
+    @SuppressWarnings("unchecked")
     protected Object convert(final Object value, final DataType dataType, final String fieldName, final Object defaultValue) {
         if (value == null) {
             return defaultValue;