You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2019/09/20 03:25:16 UTC

[nifi] branch master updated: NIFI-6369: This closes #3560. Updated JSON Readers to convert String values to Date/Time/Timestamp objects when appropriate according to the schema and the configured pattern NIFI-6939: Fix to WriteJsonRecord to deal with date/time/timestamp when no format is explicitly set

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new e18d9ce  NIFI-6369: This closes #3560. Updated JSON Readers to convert String values to Date/Time/Timestamp objects when appropriate according to the schema and the configured pattern NIFI-6939: Fix to WriteJsonRecord to deal with date/time/timestamp when no format is explicitly set
e18d9ce is described below

commit e18d9ce1e8d576a21630dcd67e9260ff5c3f9bf0
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Mon Jul 1 09:50:14 2019 -0400

    NIFI-6369: This closes #3560. Updated JSON Readers to convert String values to Date/Time/Timestamp objects when appropriate according to the schema and the configured pattern
    NIFI-6939: Fix to WriteJsonRecord to deal with date/time/timestamp when no format is explicitly set
---
 .../serialization/record/util/DataTypeUtils.java   | 24 ++++++--
 .../nifi-standard-processors/pom.xml               |  2 +
 .../processors/standard/TestValidateRecord.java    | 64 ++++++++++++++++++++++
 .../resources/TestValidateRecord/timestamp.avsc    | 11 ++++
 .../resources/TestValidateRecord/timestamp.json    |  1 +
 .../nifi-record-serialization-services/pom.xml     |  1 +
 .../nifi/json/AbstractJsonRowRecordReader.java     | 55 ++++++++++++++++---
 .../apache/nifi/json/JsonPathRowRecordReader.java  | 37 ++++++-------
 .../apache/nifi/json/JsonTreeRowRecordReader.java  | 41 +++++++-------
 .../java/org/apache/nifi/json/WriteJsonResult.java | 37 +++++++++++++
 .../org/apache/nifi/csv/TestCSVRecordReader.java   | 20 ++++---
 .../nifi/json/TestJsonPathRowRecordReader.java     | 51 +++++++++++------
 .../nifi/json/TestJsonTreeRowRecordReader.java     | 16 ++++++
 .../src/test/resources/json/timestamp.json         |  1 +
 14 files changed, 285 insertions(+), 76 deletions(-)

diff --git a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
index 308cafa..b1daa91 100644
--- a/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
+++ b/nifi-commons/nifi-record/src/main/java/org/apache/nifi/serialization/record/util/DataTypeUtils.java
@@ -1116,12 +1116,12 @@ public class DataTypeUtils {
         }
 
         if (value instanceof String) {
-            try {
-                final String string = ((String) value).trim();
-                if (string.isEmpty()) {
-                    return null;
-                }
+            final String string = ((String) value).trim();
+            if (string.isEmpty()) {
+                return null;
+            }
 
+            try {
                 if (format == null) {
                     return new Timestamp(Long.parseLong(string));
                 }
@@ -1130,11 +1130,23 @@ public class DataTypeUtils {
                 if (dateFormat == null) {
                     return new Timestamp(Long.parseLong(string));
                 }
+
                 final java.util.Date utilDate = dateFormat.parse(string);
                 return new Timestamp(utilDate.getTime());
             } catch (final ParseException e) {
+                final DateFormat dateFormat = format.get();
+                final String formatDescription;
+                if (dateFormat == null) {
+                    formatDescription = "Numeric";
+                } else if (dateFormat instanceof SimpleDateFormat) {
+                    formatDescription = ((SimpleDateFormat) dateFormat).toPattern();
+                } else {
+                    formatDescription = dateFormat.toString();
+                }
+
                 throw new IllegalTypeConversionException("Could not convert value [" + value
-                    + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: " + format);
+                    + "] of type java.lang.String to Timestamp for field " + fieldName + " because the value is not in the expected date format: "
+                    + formatDescription);
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 5a3cee4..340aa46 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -572,6 +572,8 @@
                         <exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
                         <exclude>src/test/resources/TestValidateRecord/nested-map-input.json</exclude>
                         <exclude>src/test/resources/TestValidateRecord/nested-map-schema.avsc</exclude>
+                        <exclude>src/test/resources/TestValidateRecord/timestamp.avsc</exclude>
+                        <exclude>src/test/resources/TestValidateRecord/timestamp.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
index 932f6d1..bbcc9a9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestValidateRecord.java
@@ -27,6 +27,8 @@ import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.schema.inference.SchemaInferenceUtil;
+import org.apache.nifi.serialization.DateTimeUtils;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -41,6 +43,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.charset.StandardCharsets;
@@ -409,4 +412,65 @@ public class TestValidateRecord {
         runner.assertTransferCount(ValidateRecord.REL_FAILURE, 0);
     }
 
+
+    @Test
+    public void testValidateJsonTimestamp() throws IOException, InitializationException {
+        final String validateSchema = new String(Files.readAllBytes(Paths.get("src/test/resources/TestValidateRecord/timestamp.avsc")), StandardCharsets.UTF_8);
+
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, "schema-text-property");
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter validWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", validWriter);
+        runner.setProperty(validWriter, "Schema Write Strategy", "full-schema-attribute");
+        runner.setProperty(validWriter, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(validWriter);
+
+        runner.setProperty(ValidateRecord.RECORD_READER, "reader");
+        runner.setProperty(ValidateRecord.RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(ValidateRecord.SCHEMA_TEXT, validateSchema);
+        runner.setProperty(ValidateRecord.INVALID_RECORD_WRITER, "writer");
+        runner.setProperty(ValidateRecord.ALLOW_EXTRA_FIELDS, "false");
+
+        runner.setProperty(ValidateRecord.STRICT_TYPE_CHECKING, "true");
+        runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        final MockFlowFile validFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        validFlowFile.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+        // Test with a timestamp that has an invalid format.
+        runner.clearTransferState();
+
+        runner.disableControllerService(jsonReader);
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss");
+        runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.enableControllerService(jsonReader);
+
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_INVALID, 1);
+        final MockFlowFile invalidFlowFile = runner.getFlowFilesForRelationship(ValidateRecord.REL_INVALID).get(0);
+        invalidFlowFile.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
+
+        // Test with an Inferred Schema.
+        runner.disableControllerService(jsonReader);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA.getValue());
+        runner.setProperty(jsonReader, DateTimeUtils.TIMESTAMP_FORMAT, "yyyy/MM/dd HH:mm:ss");
+        runner.enableControllerService(jsonReader);
+
+        runner.clearTransferState();
+        runner.enqueue(Paths.get("src/test/resources/TestValidateRecord/timestamp.json"));
+        runner.run();
+
+        runner.assertTransferCount(ValidateRecord.REL_VALID, 1);
+        final MockFlowFile validFlowFileInferredSchema = runner.getFlowFilesForRelationship(ValidateRecord.REL_VALID).get(0);
+        validFlowFileInferredSchema.assertContentEquals(new File("src/test/resources/TestValidateRecord/timestamp.json"));
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
new file mode 100644
index 0000000..3c480ac
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.avsc
@@ -0,0 +1,11 @@
+{
+  "name": "ts",
+  "namespace": "nifi",
+  "type": "record",
+  "fields": [{
+      "name": "timestamp",
+      "type": {
+          "type": "long", "logicalType": "timestamp-millis"
+       }
+   }]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
new file mode 100644
index 0000000..b6a68d3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestValidateRecord/timestamp.json
@@ -0,0 +1 @@
+[{"timestamp":"2019/06/27 12:45:28"}]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index e32ec28..6b984a3 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -154,6 +154,7 @@
                         <exclude>src/test/resources/json/bank-account-multiline.json</exclude>
                         <exclude>src/test/resources/json/bank-account-oneline.json</exclude>
                         <exclude>src/test/resources/json/data-types.json</exclude>
+                        <exclude>src/test/resources/json/timestamp.json</exclude>
                         <exclude>src/test/resources/json/json-with-unicode.json</exclude>
                         <exclude>src/test/resources/json/primitive-type-array.json</exclude>
                         <exclude>src/test/resources/json/single-bank-account.json</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 9874a02..69b7fab 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -40,15 +40,20 @@ import org.codehaus.jackson.node.ArrayNode;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.text.DateFormat;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.function.Supplier;
 
 public abstract class AbstractJsonRowRecordReader implements RecordReader {
     private final ComponentLog logger;
     private final JsonParser jsonParser;
     private final JsonNode firstJsonNode;
+    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
+    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
 
     private boolean firstObjectConsumed = false;
 
@@ -61,6 +66,14 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
 
         this.logger = logger;
 
+        final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
+        final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
+        final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
+
+        LAZY_DATE_FORMAT = () -> df;
+        LAZY_TIME_FORMAT = () -> tf;
+        LAZY_TIMESTAMP_FORMAT = () -> tsf;
+
         try {
             jsonParser = jsonFactory.createJsonParser(in);
             jsonParser.setCodec(codec);
@@ -80,6 +93,18 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
     }
 
+    protected Supplier<DateFormat> getLazyDateFormat() {
+        return LAZY_DATE_FORMAT;
+    }
+
+    protected Supplier<DateFormat> getLazyTimeFormat() {
+        return LAZY_TIME_FORMAT;
+    }
+
+    protected Supplier<DateFormat> getLazyTimestampFormat() {
+        return LAZY_TIMESTAMP_FORMAT;
+    }
+
 
     @Override
     public Record nextRecord(final boolean coerceTypes, final boolean dropUnknownFields) throws IOException, MalformedRecordException {
@@ -99,11 +124,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
     }
 
-    protected Object getRawNodeValue(final JsonNode fieldNode) throws IOException {
-        return getRawNodeValue(fieldNode, null);
+    protected Object getRawNodeValue(final JsonNode fieldNode, final String fieldName) throws IOException {
+        return getRawNodeValue(fieldNode, null, fieldName);
     }
 
-    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType) throws IOException {
+    protected Object getRawNodeValue(final JsonNode fieldNode, final DataType dataType, final String fieldName) throws IOException {
         if (fieldNode == null || fieldNode.isNull()) {
             return null;
         }
@@ -121,7 +146,23 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
         }
 
         if (fieldNode.isTextual()) {
-            return fieldNode.getTextValue();
+            final String textValue = fieldNode.getTextValue();
+            if (dataType == null) {
+                return textValue;
+            }
+
+            switch (dataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    try {
+                        return DataTypeUtils.convertType(textValue, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                    } catch (final Exception e) {
+                        return textValue;
+                    }
+            }
+
+            return textValue;
         }
 
         if (fieldNode.isArray()) {
@@ -139,7 +180,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
             }
 
             for (final JsonNode node : arrayNode) {
-                final Object value = getRawNodeValue(node, elementDataType);
+                final Object value = getRawNodeValue(node, elementDataType, fieldName);
                 arrayElements[count++] = value;
             }
 
@@ -166,7 +207,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
                     while (fieldNames.hasNext()) {
                         final String childFieldName = fieldNames.next();
 
-                        final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), possibleSchema.getDataType(childFieldName).orElse(null));
+                        final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), possibleSchema.getDataType(childFieldName).orElse(null), childFieldName);
                         childValues.put(childFieldName, childValue);
                     }
 
@@ -187,7 +228,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
                 final String childFieldName = fieldNames.next();
 
                 final DataType childDataType = childSchema.getDataType(childFieldName).orElse(null);
-                final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), childDataType);
+                final Object childValue = getRawNodeValue(fieldNode.get(childFieldName), childDataType, childFieldName);
                 childValues.put(childFieldName, childValue);
             }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index a664b88..c977cfd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -39,14 +39,12 @@ import org.codehaus.jackson.JsonNode;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.function.Supplier;
 
 public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private static final Configuration STRICT_PROVIDER_CONFIGURATION = Configuration.builder().jsonProvider(new JacksonJsonProvider()).build();
@@ -56,23 +54,11 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
     private final InputStream in;
     private RecordSchema schema;
 
-    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
     public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
                 final String dateFormat, final String timeFormat, final String timestampFormat)
                 throws MalformedRecordException, IOException {
         super(in, logger, dateFormat, timeFormat, timestampFormat);
 
-        final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
-        final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
-        final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
-
-        LAZY_DATE_FORMAT = () -> df;
-        LAZY_TIME_FORMAT = () -> tf;
-        LAZY_TIMESTAMP_FORMAT = () -> tsf;
-
         this.schema = schema;
         this.jsonPaths = jsonPaths;
         this.in = in;
@@ -123,7 +109,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
                 value = convert(value, desiredType, fieldName, defaultValue);
             } else {
                 final DataType dataType = field.map(RecordField::getDataType).orElse(null);
-                value = convert(value, dataType);
+                value = convert(value, dataType, fieldName);
             }
 
             values.put(fieldName, value);
@@ -134,7 +120,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
 
 
     @SuppressWarnings("unchecked")
-    protected Object convert(final Object value, final DataType dataType) {
+    protected Object convert(final Object value, final DataType dataType, final String fieldName) {
         if (value == null) {
             return null;
         }
@@ -152,7 +138,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
 
             int i = 0;
             for (final Object val : list) {
-                array[i++] = convert(val, elementDataType);
+                array[i++] = convert(val, elementDataType, fieldName);
             }
 
             return array;
@@ -188,12 +174,25 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
                 final RecordField recordField = childSchema.getField(key).orElse(null);
                 final DataType childDataType = recordField == null ? null : recordField.getDataType();
 
-                values.put(key, convert(childValue, childDataType));
+                values.put(key, convert(childValue, childDataType, fieldName));
             }
 
             return new MapRecord(childSchema, values);
         }
 
+        if (value instanceof String) {
+            switch (dataType.getFieldType()) {
+                case DATE:
+                case TIME:
+                case TIMESTAMP:
+                    try {
+                        return DataTypeUtils.convertType(value, dataType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
+                    } catch (final Exception e) {
+                        return value;
+                    }
+            }
+        }
+
         return value;
     }
 
@@ -240,7 +239,7 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
 
             return new MapRecord(childSchema, coercedValues);
         } else {
-            return DataTypeUtils.convertType(value, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+            return DataTypeUtils.convertType(value, dataType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 058d9ee..d0172e9 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -36,7 +36,6 @@ import org.codehaus.jackson.node.ArrayNode;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.text.DateFormat;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -47,23 +46,11 @@ import java.util.function.Supplier;
 public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
     private final RecordSchema schema;
 
-    private final Supplier<DateFormat> LAZY_DATE_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIME_FORMAT;
-    private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
-
 
     public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
         final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
         super(in, logger, dateFormat, timeFormat, timestampFormat);
         this.schema = schema;
-
-        final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
-        final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
-        final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
-
-        LAZY_DATE_FORMAT = () -> df;
-        LAZY_TIME_FORMAT = () -> tf;
-        LAZY_TIMESTAMP_FORMAT = () -> tsf;
     }
 
 
@@ -97,6 +84,22 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
         return null;
     }
 
+    private boolean isDateTimeTimestampType(final RecordField field) {
+        if (field == null) {
+            return false;
+        }
+
+        final RecordFieldType fieldType = field.getDataType().getFieldType();
+        switch (fieldType) {
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+                return true;
+            default:
+                return false;
+        }
+    }
+
     private Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final String fieldNamePrefix,
             final boolean coerceTypes, final boolean dropUnknown) throws IOException, MalformedRecordException {
 
@@ -111,13 +114,13 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
 
                 final String fieldName = recordField.getFieldName();
 
-                final Object value;
+                Object value;
                 if (coerceTypes) {
                     final DataType desiredType = recordField.getDataType();
                     final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
                     value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
                 } else {
-                    value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType(), fieldName);
                 }
 
                 values.put(fieldName, value);
@@ -136,7 +139,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
                     final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
                     value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
                 } else {
-                    value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType());
+                    value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType(), fieldName);
                 }
 
                 values.put(fieldName, value);
@@ -166,8 +169,8 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
             case DATE:
             case TIME:
             case TIMESTAMP: {
-                final Object rawValue = getRawNodeValue(fieldNode);
-                final Object converted = DataTypeUtils.convertType(rawValue, desiredType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+                final Object rawValue = getRawNodeValue(fieldNode, fieldName);
+                final Object converted = DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
                 return converted;
             }
             case MAP: {
@@ -222,7 +225,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
                 }
             }
             case CHOICE: {
-                return DataTypeUtils.convertType(getRawNodeValue(fieldNode, desiredType), desiredType, fieldName);
+                return DataTypeUtils.convertType(getRawNodeValue(fieldNode, desiredType, fieldName), desiredType, fieldName);
             }
         }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 2b65eea..dd7eaec 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -37,6 +37,7 @@ import org.apache.nifi.serialization.record.type.RecordDataType;
 import org.apache.nifi.serialization.record.util.DataTypeUtils;
 import org.codehaus.jackson.JsonFactory;
 import org.codehaus.jackson.JsonGenerator;
+import org.codehaus.jackson.map.ObjectMapper;
 import org.codehaus.jackson.util.MinimalPrettyPrinter;
 
 import java.io.IOException;
@@ -60,6 +61,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
     private String mimeType = "application/json";
 
+    private static final ObjectMapper objectMapper = new ObjectMapper();
+
     public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
             final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
         this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
@@ -86,6 +89,8 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
         LAZY_TIMESTAMP_FORMAT = () -> tsf;
 
         final JsonFactory factory = new JsonFactory();
+        factory.setCodec(objectMapper);
+
         this.generator = factory.createJsonGenerator(out);
         if (prettyPrint) {
             generator.useDefaultPrettyPrinter();
@@ -270,9 +275,41 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
             return;
         }
 
+        if (value instanceof java.sql.Time) {
+            final Object formatted = format((java.sql.Time) value, LAZY_TIME_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+        if (value instanceof java.sql.Date) {
+            final Object formatted = format((java.sql.Date) value, LAZY_DATE_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+        if (value instanceof java.util.Date) {
+            final Object formatted = format((java.util.Date) value, LAZY_TIMESTAMP_FORMAT);
+            generator.writeObject(formatted);
+            return;
+        }
+
         generator.writeObject(value);
     }
 
+    private Object format(final java.util.Date value, final Supplier<DateFormat> formatSupplier) {
+        if (value == null) {
+            return null;
+        }
+
+        if (formatSupplier == null) {
+            return value.getTime();
+        }
+        final DateFormat format = formatSupplier.get();
+        if (format == null) {
+            return value.getTime();
+        }
+
+        return format.format(value);
+    }
+
     @SuppressWarnings("unchecked")
     private void writeValue(final JsonGenerator generator, final Object value, final String fieldName, final DataType dataType) throws IOException {
         if (value == null) {
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
index dd25a3b..230a0e3 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/csv/TestCSVRecordReader.java
@@ -102,18 +102,20 @@ public class TestCSVRecordReader {
         fields.add(new RecordField("date", RecordFieldType.DATE.getDataType()));
         final RecordSchema schema = new SimpleRecordSchema(fields);
 
-        try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
-             final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream bais = new ByteArrayInputStream(text.getBytes());
+                 final CSVRecordReader reader = new CSVRecordReader(bais, Mockito.mock(ComponentLog.class), schema, format, true, false,
                      "MM/dd/yyyy", RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat(), "UTF-8")) {
 
-            final Record record = reader.nextRecord();
-            final java.sql.Date date = (Date) record.getValue("date");
-            final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
-            calendar.setTimeInMillis(date.getTime());
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final java.sql.Date date = (Date) record.getValue("date");
+                final Calendar calendar = Calendar.getInstance(TimeZone.getTimeZone("gmt"));
+                calendar.setTimeInMillis(date.getTime());
 
-            assertEquals(1983, calendar.get(Calendar.YEAR));
-            assertEquals(10, calendar.get(Calendar.MONTH));
-            assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+                assertEquals(1983, calendar.get(Calendar.YEAR));
+                assertEquals(10, calendar.get(Calendar.MONTH));
+                assertEquals(30, calendar.get(Calendar.DAY_OF_MONTH));
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 5e0da91..d97017c 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -17,21 +17,7 @@
 
 package org.apache.nifi.json;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.stream.Collectors;
-
+import com.jayway.jsonpath.JsonPath;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -45,7 +31,21 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import com.jayway.jsonpath.JsonPath;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 public class TestJsonPathRowRecordReader {
     private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
@@ -167,6 +167,25 @@ public class TestJsonPathRowRecordReader {
         }
     }
 
+    @Test
+    public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>();
+        jsonPaths.put("timestamp", JsonPath.compile("$.timestamp"));
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json"));
+                 final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue("With coerceTypes set to " + coerceTypes + ", value is not a Timestamp", value instanceof java.sql.Timestamp);
+            }
+        }
+    }
+
 
 
     @Test
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index adbdc02..759a5ff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -407,6 +407,22 @@ public class TestJsonTreeRowRecordReader {
 
 
     @Test
+    public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+        final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
+        final RecordSchema schema = new SimpleRecordSchema(recordFields);
+
+        for (final boolean coerceTypes : new boolean[] {true, false}) {
+            try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json"));
+                 final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, Mockito.mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
+
+                final Record record = reader.nextRecord(coerceTypes, false);
+                final Object value = record.getValue("timestamp");
+                assertTrue("With coerceTypes set to " + coerceTypes + ", value is not a Timestamp", value instanceof java.sql.Timestamp);
+            }
+        }
+    }
+
+    @Test
     public void testSingleJsonElement() throws IOException, MalformedRecordException {
         final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
 
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
new file mode 100644
index 0000000..ee5d90f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/timestamp.json
@@ -0,0 +1 @@
+{"timestamp": "2019/06/27 13:04:04"}
\ No newline at end of file