You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2017/05/03 17:14:01 UTC

nifi git commit: NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services

Repository: nifi
Updated Branches:
  refs/heads/master b7c15c360 -> 9b177fbcb


NIFI-3787: Addressed NPE and ensure that if validation fails due to RuntimeException, that it gets logged. Also clarified documentation for Json Reader services

This closes #1742.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/9b177fbc
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/9b177fbc
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/9b177fbc

Branch: refs/heads/master
Commit: 9b177fbcbabe67b9ebb1d0a5d3fafe7b67aebda1
Parents: b7c15c3
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed May 3 12:33:30 2017 -0400
Committer: Bryan Bende <bb...@apache.org>
Committed: Wed May 3 13:13:43 2017 -0400

----------------------------------------------------------------------
 .../nifi/serialization/record/MapRecord.java    |  2 +-
 .../record/util/DataTypeUtils.java              | 23 +++++++++
 .../nifi/serialization/DateTimeUtils.java       | 13 +++--
 .../controller/AbstractConfiguredComponent.java |  4 ++
 .../java/org/apache/nifi/avro/AvroReader.java   | 22 +++++++++
 .../apache/nifi/avro/AvroRecordSetWriter.java   |  2 -
 .../avro/EmbeddedAvroSchemaAccessStrategy.java  | 51 ++++++++++++++++++++
 .../nifi/csv/CSVHeaderSchemaStrategy.java       |  9 ++++
 .../java/org/apache/nifi/csv/CSVReader.java     | 12 ++++-
 .../org/apache/nifi/csv/CSVRecordSetWriter.java |  3 +-
 .../java/org/apache/nifi/grok/GrokReader.java   | 37 +++++++++-----
 .../nifi/json/JsonPathRowRecordReader.java      |  6 +--
 .../nifi/json/JsonTreeRowRecordReader.java      |  6 +--
 .../additionalDetails.html                      |  3 +-
 .../additionalDetails.html                      |  6 ++-
 15 files changed, 169 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
index f9a22fc..8d98c33 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/MapRecord.java
@@ -192,7 +192,7 @@ public class MapRecord implements Record {
 
     @Override
     public Date getAsDate(final String fieldName, final String format) {
-        return DataTypeUtils.toDate(getValue(fieldName), DataTypeUtils.getDateFormat(format), fieldName);
+        return DataTypeUtils.toDate(getValue(fieldName), format == null ? null : DataTypeUtils.getDateFormat(format), fieldName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index f59ac0d..9f1e463 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -32,6 +32,7 @@ import java.sql.Timestamp;
 import java.text.DateFormat;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -343,6 +344,10 @@ public class DataTypeUtils {
             return getDateFormat(format).format((java.util.Date) value);
         }
 
+        if (value instanceof Object[]) {
+            return Arrays.toString((Object[]) value);
+        }
+
         return value.toString();
     }
 
@@ -396,6 +401,10 @@ public class DataTypeUtils {
         }
 
         if (value instanceof String) {
+            if (format == null) {
+                return isInteger((String) value);
+            }
+
             try {
                 getDateFormat(format).parse((String) value);
                 return true;
@@ -407,6 +416,20 @@ public class DataTypeUtils {
         return false;
     }
 
+    private static boolean isInteger(final String value) {
+        if (value == null || value.isEmpty()) {
+            return false;
+        }
+
+        for (int i = 0; i < value.length(); i++) {
+            if (!Character.isDigit(value.charAt(i))) {
+                return false;
+            }
+        }
+
+        return true;
+    }
+
     public static Time toTime(final Object value, final DateFormat format, final String fieldName) {
         if (value == null) {
             return null;

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
index efc3e4e..6336943 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-standard-record-utils/src/main/java/org/apache/nifi/serialization/DateTimeUtils.java
@@ -23,7 +23,9 @@ public class DateTimeUtils {
     public static final PropertyDescriptor DATE_FORMAT = new PropertyDescriptor.Builder()
         .name("Date Format")
         .description("Specifies the format to use when reading/writing Date fields. "
-            + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+            + "If not specified, Date fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+            + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy for a two-digit month, followed by "
+            + "a two-digit day, followed by a four-digit year, all separated by '/' characters, as in 01/01/2017).")
         .expressionLanguageSupported(false)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
@@ -32,7 +34,9 @@ public class DateTimeUtils {
     public static final PropertyDescriptor TIME_FORMAT = new PropertyDescriptor.Builder()
         .name("Time Format")
         .description("Specifies the format to use when reading/writing Time fields. "
-            + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+            + "If not specified, Time fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+            + "If specified, the value must match the Java Simple Date Format (for example, HH:mm:ss for a two-digit hour in 24-hour format, followed by "
+            + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 18:04:15).")
         .expressionLanguageSupported(false)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)
@@ -41,7 +45,10 @@ public class DateTimeUtils {
     public static final PropertyDescriptor TIMESTAMP_FORMAT = new PropertyDescriptor.Builder()
         .name("Timestamp Format")
         .description("Specifies the format to use when reading/writing Timestamp fields. "
-            + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT).")
+            + "If not specified, Timestamp fields will be assumed to be number of milliseconds since epoch (Midnight, Jan 1, 1970 GMT). "
+            + "If specified, the value must match the Java Simple Date Format (for example, MM/dd/yyyy HH:mm:ss for a two-digit month, followed by "
+            + "a two-digit day, followed by a four-digit year, all separated by '/' characters; and then followed by a two-digit hour in 24-hour format, followed by "
+            + "a two-digit minute, followed by a two-digit second, all separated by ':' characters, as in 01/01/2017 18:04:15).")
         .expressionLanguageSupported(false)
         .addValidator(new SimpleDateFormatValidator())
         .required(false)

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
index 1156987..50a71e7 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core-api/src/main/java/org/apache/nifi/controller/AbstractConfiguredComponent.java
@@ -30,6 +30,8 @@ import org.apache.nifi.nar.ExtensionManager;
 import org.apache.nifi.nar.NarCloseable;
 import org.apache.nifi.registry.VariableRegistry;
 import org.apache.nifi.util.file.classloader.ClassLoaderUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.net.URL;
 import java.util.ArrayList;
@@ -49,6 +51,7 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 public abstract class AbstractConfiguredComponent implements ConfigurableComponent, ConfiguredComponent {
+    private static final Logger logger = LoggerFactory.getLogger(AbstractConfiguredComponent.class);
 
     private final String id;
     private final ValidationContextFactory validationContextFactory;
@@ -463,6 +466,7 @@ public abstract class AbstractConfiguredComponent implements ConfigurableCompone
                 }
             }
         } catch (final Throwable t) {
+            logger.error("Failed to perform validation of " + this, t);
             results.add(new ValidationResult.Builder().explanation("Failed to run validation due to " + t.toString()).valid(false).build());
         } finally {
             lock.unlock();

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
index 205fec6..8451f5a 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -29,10 +29,14 @@ import org.apache.avro.Schema;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schemaregistry.services.SchemaRegistry;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -63,6 +67,24 @@ public class AvroReader extends SchemaRegistryService implements RecordReaderFac
     }
 
     @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ConfigurationContext context) {
+        if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
+            return new EmbeddedAvroSchemaAccessStrategy();
+        } else {
+            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+        }
+    }
+
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(String allowableValue, SchemaRegistry schemaRegistry, ValidationContext context) {
+        if (EMBEDDED_AVRO_SCHEMA.getValue().equals(allowableValue)) {
+            return new EmbeddedAvroSchemaAccessStrategy();
+        } else {
+            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+        }
+    }
+
+    @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException, SchemaNotFoundException {
         final String schemaAccessStrategy = getConfigurationContext().getProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY).getValue();
         if (EMBEDDED_AVRO_SCHEMA.getValue().equals(schemaAccessStrategy)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
index 91ca6cf..121d1ec 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -65,8 +65,6 @@ public class AvroRecordSetWriter extends SchemaRegistryRecordSetWriter implement
         try {
             final RecordSchema recordSchema = getSchema(flowFile, in);
 
-
-
             final Schema avroSchema;
             try {
                 if (recordSchema.getSchemaFormat().isPresent() & recordSchema.getSchemaFormat().get().equals(AvroTypeUtil.AVRO_SCHEMA_FORMAT)) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
new file mode 100644
index 0000000..eba9429
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/EmbeddedAvroSchemaAccessStrategy.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.EnumSet;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.schema.access.SchemaAccessStrategy;
+import org.apache.nifi.schema.access.SchemaField;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class EmbeddedAvroSchemaAccessStrategy implements SchemaAccessStrategy {
+    private final Set<SchemaField> schemaFields = EnumSet.of(SchemaField.SCHEMA_TEXT, SchemaField.SCHEMA_TEXT_FORMAT);
+
+    @Override
+    public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException, IOException {
+        final DataFileStream<GenericRecord> dataFileStream = new DataFileStream<>(contentStream, new GenericDatumReader<GenericRecord>());
+        final Schema avroSchema = dataFileStream.getSchema();
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
+        return recordSchema;
+    }
+
+    @Override
+    public Set<SchemaField> getSuppliedSchemaFields() {
+        return schemaFields;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
index a57f10b..f2b1cbb 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVHeaderSchemaStrategy.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import org.apache.commons.csv.CSVFormat;
 import org.apache.commons.csv.CSVParser;
 import org.apache.commons.io.input.BOMInputStream;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.schema.access.SchemaAccessStrategy;
@@ -47,8 +48,16 @@ public class CSVHeaderSchemaStrategy implements SchemaAccessStrategy {
         this.context = context;
     }
 
+    public CSVHeaderSchemaStrategy(final ValidationContext context) {
+        this.context = null;
+    }
+
     @Override
     public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+        if (this.context == null) {
+            throw new SchemaNotFoundException("Schema Access Strategy intended only for validation purposes and cannot obtain schema");
+        }
+
         try {
             final CSVFormat csvFormat = CSVUtils.createCSVFormat(context).withFirstRecordAsHeader();
             try (final Reader reader = new InputStreamReader(new BOMInputStream(contentStream));

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
index 9fe4136..dbea3dc 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -29,6 +29,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -52,8 +53,6 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
         "The first non-comment line of the CSV file is a header line that contains the names of the columns. The schema will be derived by using the "
             + "column names in the header and assuming that all columns are of type String.");
 
-    private volatile SchemaAccessStrategy headerDerivedSchemaStrategy;
-
     private volatile CSVFormat csvFormat;
     private volatile String dateFormat;
     private volatile String timeFormat;
@@ -106,6 +105,15 @@ public class CSVReader extends SchemaRegistryService implements RecordReaderFact
     }
 
     @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+        if (allowableValue.equalsIgnoreCase(headerDerivedAllowableValue.getValue())) {
+            return new CSVHeaderSchemaStrategy(context);
+        }
+
+        return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+    }
+
+    @Override
     protected List<AllowableValue> getSchemaAccessStrategyValues() {
         final List<AllowableValue> allowableValues = new ArrayList<>(super.getSchemaAccessStrategyValues());
         allowableValues.add(headerDerivedAllowableValue);

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
index 15f1e1f..0b29f09 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -38,7 +38,8 @@ import org.apache.nifi.serialization.record.RecordSchema;
 
 @Tags({"csv", "result", "set", "recordset", "record", "writer", "serializer", "row", "tsv", "tab", "separated", "delimited"})
 @CapabilityDescription("Writes the contents of a RecordSet as CSV data. The first line written "
-    + "will be the column names. All subsequent lines will be the values corresponding to those columns.")
+    + "will be the column names (unless the 'Include Header Line' property is false). All subsequent lines will be the values "
+    + "corresponding to the record fields.")
 public class CSVRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
 
     private volatile CSVFormat csvFormat;

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
index 6c8deab..a874632 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -33,6 +33,7 @@ import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
@@ -180,24 +181,36 @@ public class GrokReader extends SchemaRegistryService implements RecordReaderFac
     @Override
     protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ConfigurationContext context) {
         if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
-            return new SchemaAccessStrategy() {
-                private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
-
-                @Override
-                public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
-                    return recordSchema;
-                }
+            return createAccessStrategy();
+        } else {
+            return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
+        }
+    }
 
-                @Override
-                public Set<SchemaField> getSuppliedSchemaFields() {
-                    return schemaFields;
-                }
-            };
+    @Override
+    protected SchemaAccessStrategy getSchemaAccessStrategy(final String allowableValue, final SchemaRegistry schemaRegistry, final ValidationContext context) {
+        if (allowableValue.equalsIgnoreCase(STRING_FIELDS_FROM_GROK_EXPRESSION.getValue())) {
+            return createAccessStrategy();
         } else {
             return super.getSchemaAccessStrategy(allowableValue, schemaRegistry, context);
         }
     }
 
+    private SchemaAccessStrategy createAccessStrategy() {
+        return new SchemaAccessStrategy() {
+            private final Set<SchemaField> schemaFields = EnumSet.noneOf(SchemaField.class);
+
+            @Override
+            public RecordSchema getSchema(final FlowFile flowFile, final InputStream contentStream) throws SchemaNotFoundException {
+                return recordSchema;
+            }
+
+            @Override
+            public Set<SchemaField> getSuppliedSchemaFields() {
+                return schemaFields;
+            }
+        };
+    }
 
     @Override
     public RecordReader createRecordReader(final FlowFile flowFile, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 2eebfe2..7d2a2c1 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -62,9 +62,9 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
         throws MalformedRecordException, IOException {
         super(in, logger);
 
-        this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
-        this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
-        this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
+        this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+        this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+        this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
 
         this.schema = schema;
         this.jsonPaths = jsonPaths;

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index f15e04e..ee5bff9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -56,9 +56,9 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
         super(in, logger);
         this.schema = schema;
 
-        this.dateFormat = DataTypeUtils.getDateFormat(dateFormat);
-        this.timeFormat = DataTypeUtils.getDateFormat(timeFormat);
-        this.timestampFormat = DataTypeUtils.getDateFormat(timestampFormat);
+        this.dateFormat = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+        this.timeFormat = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+        this.timestampFormat = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
     }
 
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
index aceb54d..14d40f6 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonPathReader/additionalDetails.html
@@ -81,7 +81,8 @@
 			<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
 			<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
 			<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
-				property (Date Format, Time Format, Timestamp Format property).</li>
+				property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+				representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
 		</ul>
 		
 		<p>

http://git-wip-us.apache.org/repos/asf/nifi/blob/9b177fbc/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index 90980d1..c08e720 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -25,7 +25,8 @@
         	The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire
         	JSON Object tree. The Controller Service must be configured with a Schema that describes the structure
         	of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped.
-        	If the schema contains a field for which no JSON field exists, a null value will be used in the Record.
+        	If the schema contains a field for which no JSON field exists, a null value will be used in the Record
+        	(or the default value defined in the schema, if applicable).
         </p>
         
         <p>
@@ -66,7 +67,8 @@
 			<li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
 			<li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
 			<li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
-				property (Date Format, Time Format, Timestamp Format property).</li>
+				property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+				representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
 		</ul>
 		
 		<p>