You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2017/04/11 23:33:27 UTC

[04/19] nifi git commit: NIFI-1280: Refactoring to make more generic so that other data types can be supported; created InputStreams to content on-demand so that multiple passes can be made over FlowFile content if required. Created new Controller Servic

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
new file mode 100644
index 0000000..1848020
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services-nar/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,77 @@
+nifi-record-serialization-services-nar
+Copyright 2014-2017 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+===========================================
+Apache Software License v2
+===========================================
+
+The following binary components are provided under the Apache Software License v2
+
+  (ASLv2) Apache Commons Lang
+    The following NOTICE information applies:
+      Apache Commons Lang
+      Copyright 2001-2015 The Apache Software Foundation
+
+      This product includes software from the Spring Framework,
+      under the Apache License 2.0 (see: StringUtils.containsWhitespace())
+
+  (ASLv2) Grok
+    The following NOTICE information applies:
+      Grok
+      Copyright 2014 Anthony Corbacho, and contributors.
+
+  (ASLv2) Groovy (org.codehaus.groovy:groovy:jar:2.4.5 - http://www.groovy-lang.org)
+    The following NOTICE information applies:
+      Groovy Language
+         Copyright 2003-2015 The respective authors and developers
+         Developers and Contributors are listed in the project POM file
+         and Gradle build file
+
+         This product includes software developed by
+         The Groovy community (http://groovy.codehaus.org/).
+
+  (ASLv2) Google GSON
+    The following NOTICE information applies:
+      Copyright 2008 Google Inc.
+
+  (ASLv2) Jackson JSON processor
+    The following NOTICE information applies:
+      # Jackson JSON processor
+
+      Jackson is a high-performance, Free/Open Source JSON processing library.
+      It was originally written by Tatu Saloranta (tatu.saloranta@iki.fi), and has
+      been in development since 2007.
+      It is currently developed by a community of developers, as well as supported
+      commercially by FasterXML.com.
+
+      ## Licensing
+
+      Jackson core and extension components may licensed under different licenses.
+      To find the details that apply to this artifact see the accompanying LICENSE file.
+      For more information, including possible other licensing options, contact
+      FasterXML.com (http://fasterxml.com).
+
+      ## Credits
+
+      A list of contributors may be found from CREDITS file, which is included
+      in some artifacts (usually source distributions); but is always available
+      from the source code management (SCM) system project uses.
+
+  (ASLv2) JSON-SMART
+    The following NOTICE information applies:
+      Copyright 2011 JSON-SMART authors
+
+   (ASLv2) JsonPath
+     The following NOTICE information applies:
+       Copyright 2011 JsonPath authors
+
+  (ASLv2) opencsv (net.sf.opencsv:opencsv:2.3)
+
+  (ASLv2) Apache Avro
+    The following NOTICE information applies:
+      Apache Avro
+      Copyright 2009-2013 The Apache Software Foundation
+  
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
new file mode 100644
index 0000000..ae3c172
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/.gitignore
@@ -0,0 +1 @@
+/bin/

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
new file mode 100644
index 0000000..9b2a56c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -0,0 +1,94 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <!-- Licensed to the Apache Software Foundation (ASF) under one or more 
+        contributor license agreements. See the NOTICE file distributed with this 
+        work for additional information regarding copyright ownership. The ASF licenses 
+        this file to You under the Apache License, Version 2.0 (the "License"); you 
+        may not use this file except in compliance with the License. You may obtain 
+        a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless 
+        required by applicable law or agreed to in writing, software distributed 
+        under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES 
+        OR CONDITIONS OF ANY KIND, either express or implied. See the License for 
+        the specific language governing permissions and limitations under the License. -->
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-record-serialization-services-bundle</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>nifi-record-serialization-services</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-utils</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record-serialization-service-api</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.jayway.jsonpath</groupId>
+            <artifactId>json-path</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.codehaus.jackson</groupId>
+            <artifactId>jackson-mapper-asl</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>net.sf.opencsv</groupId>
+            <artifactId>opencsv</artifactId>
+            <version>2.3</version>
+        </dependency>
+        <dependency>
+            <groupId>io.thekraken</groupId>
+            <artifactId>grok</artifactId>
+            <version>0.1.5</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.avro</groupId>
+            <artifactId>avro</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.rat</groupId>
+                <artifactId>apache-rat-plugin</artifactId>
+                <configuration>
+                    <excludes combine.children="append">
+                        <exclude>src/test/resources/csv/extra-white-space.csv</exclude>
+                        <exclude>src/test/resources/csv/multi-bank-account.csv</exclude>
+                        <exclude>src/test/resources/csv/single-bank-account.csv</exclude>
+                        <exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
+                        <exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
+                        <exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
+                        <exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
+                        <exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
+                        <exclude>src/test/resources/json/bank-account-array.json</exclude>
+                        <exclude>src/test/resources/json/json-with-unicode.json</exclude>
+                        <exclude>src/test/resources/json/primitive-type-array.json</exclude>
+                        <exclude>src/test/resources/json/single-bank-account.json</exclude>
+                        <exclude>src/test/resources/json/single-element-nested-array.json</exclude>
+                        <exclude>src/test/resources/json/single-element-nested.json</exclude>
+                    </excludes>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
new file mode 100644
index 0000000..fc0c598
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+
+@Tags({"avro", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
+@CapabilityDescription("Parses Avro data and returns each Avro record as an separate record.")
+public class AvroReader extends AbstractControllerService implements RowRecordReaderFactory {
+
+    @Override
+    public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws MalformedRecordException, IOException {
+        return new AvroRecordReader(in);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
new file mode 100644
index 0000000..e98a5ad
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordReader.java
@@ -0,0 +1,254 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.Schema.Type;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericData.Array;
+import org.apache.avro.generic.GenericData.StringType;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericFixed;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+public class AvroRecordReader implements RecordReader {
+    private final InputStream in;
+    private final Schema avroSchema;
+    private final DataFileStream<GenericRecord> dataFileStream;
+    private RecordSchema recordSchema;
+
+    public AvroRecordReader(final InputStream in) throws IOException, MalformedRecordException {
+        this.in = in;
+
+        dataFileStream = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>());
+        this.avroSchema = dataFileStream.getSchema();
+        GenericData.setStringType(this.avroSchema, StringType.String);
+    }
+
+    @Override
+    public void close() throws IOException {
+        dataFileStream.close();
+        in.close();
+    }
+
+    @Override
+    public Record nextRecord() throws IOException, MalformedRecordException {
+        if (!dataFileStream.hasNext()) {
+            return null;
+        }
+
+        GenericRecord record = null;
+        while (record == null && dataFileStream.hasNext()) {
+            record = dataFileStream.next();
+        }
+
+        final RecordSchema schema = getSchema();
+        final Map<String, Object> values = convertRecordToObjectArray(record, schema);
+        return new MapRecord(schema, values);
+    }
+
+
+    private Map<String, Object> convertRecordToObjectArray(final GenericRecord record, final RecordSchema schema) {
+        final Map<String, Object> values = new HashMap<>(schema.getFieldCount());
+
+        for (final String fieldName : schema.getFieldNames()) {
+            final Object value = record.get(fieldName);
+
+            final Field avroField = record.getSchema().getField(fieldName);
+            if (avroField == null) {
+                values.put(fieldName, null);
+                continue;
+            }
+
+            final Schema fieldSchema = avroField.schema();
+            final DataType dataType = schema.getDataType(fieldName).orElse(null);
+            final Object converted = convertValue(value, fieldSchema, avroField.name(), dataType);
+            values.put(fieldName, converted);
+        }
+
+        return values;
+    }
+
+
+    @Override
+    public RecordSchema getSchema() throws MalformedRecordException {
+        if (recordSchema != null) {
+            return recordSchema;
+        }
+
+        recordSchema = createSchema(avroSchema);
+        return recordSchema;
+    }
+
+    private RecordSchema createSchema(final Schema avroSchema) {
+        final List<RecordField> recordFields = new ArrayList<>(avroSchema.getFields().size());
+        for (final Field field : avroSchema.getFields()) {
+            final String fieldName = field.name();
+            final DataType dataType = determineDataType(field.schema());
+            recordFields.add(new RecordField(fieldName, dataType));
+        }
+
+        final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+        return recordSchema;
+    }
+
+    private Object convertValue(final Object value, final Schema avroSchema, final String fieldName, final DataType desiredType) {
+        if (value == null) {
+            return null;
+        }
+
+        switch (avroSchema.getType()) {
+            case UNION:
+                if (value instanceof GenericData.Record) {
+                    final GenericData.Record record = (GenericData.Record) value;
+                    return convertValue(value, record.getSchema(), fieldName, desiredType);
+                }
+                break;
+            case RECORD:
+                final GenericData.Record record = (GenericData.Record) value;
+                final Schema recordSchema = record.getSchema();
+                final List<Field> recordFields = recordSchema.getFields();
+                final Map<String, Object> values = new HashMap<>(recordFields.size());
+                for (final Field field : recordFields) {
+                    final DataType desiredFieldType = determineDataType(field.schema());
+                    final Object avroFieldValue = record.get(field.name());
+                    final Object fieldValue = convertValue(avroFieldValue, field.schema(), field.name(), desiredFieldType);
+                    values.put(field.name(), fieldValue);
+                }
+                final RecordSchema childSchema = createSchema(recordSchema);
+                return new MapRecord(childSchema, values);
+            case BYTES:
+                final ByteBuffer bb = (ByteBuffer) value;
+                return bb.array();
+            case FIXED:
+                final GenericFixed fixed = (GenericFixed) value;
+                return fixed.bytes();
+            case ENUM:
+                return value.toString();
+            case NULL:
+                return null;
+            case STRING:
+                return value.toString();
+            case ARRAY:
+                final Array<?> array = (Array<?>) value;
+                final Object[] valueArray = new Object[array.size()];
+                for (int i = 0; i < array.size(); i++) {
+                    final Schema elementSchema = avroSchema.getElementType();
+                    valueArray[i] = convertValue(array.get(i), elementSchema, fieldName, determineDataType(elementSchema));
+                }
+                return valueArray;
+            case MAP:
+                final Map<?, ?> avroMap = (Map<?, ?>) value;
+                final Map<String, Object> map = new HashMap<>(avroMap.size());
+                for (final Map.Entry<?, ?> entry : avroMap.entrySet()) {
+                    Object obj = entry.getValue();
+                    if (obj instanceof Utf8 || obj instanceof CharSequence) {
+                        obj = obj.toString();
+                    }
+
+                    map.put(entry.getKey().toString(), obj);
+                }
+                return map;
+        }
+
+        return value;
+    }
+
+
+    private DataType determineDataType(final Schema avroSchema) {
+        final Type avroType = avroSchema.getType();
+
+        switch (avroType) {
+            case ARRAY:
+            case BYTES:
+            case FIXED:
+                return RecordFieldType.ARRAY.getDataType();
+            case BOOLEAN:
+                return RecordFieldType.BOOLEAN.getDataType();
+            case DOUBLE:
+                return RecordFieldType.DOUBLE.getDataType();
+            case ENUM:
+            case STRING:
+                return RecordFieldType.STRING.getDataType();
+            case FLOAT:
+                return RecordFieldType.FLOAT.getDataType();
+            case INT:
+                return RecordFieldType.INT.getDataType();
+            case LONG:
+                return RecordFieldType.LONG.getDataType();
+            case RECORD: {
+                final List<Field> avroFields = avroSchema.getFields();
+                final List<RecordField> recordFields = new ArrayList<>(avroFields.size());
+
+                for (final Field field : avroFields) {
+                    final String fieldName = field.name();
+                    final Schema fieldSchema = field.schema();
+                    final DataType fieldType = determineDataType(fieldSchema);
+                    recordFields.add(new RecordField(fieldName, fieldType));
+                }
+
+                final RecordSchema recordSchema = new SimpleRecordSchema(recordFields);
+                return RecordFieldType.RECORD.getDataType(recordSchema);
+            }
+            case NULL:
+            case MAP:
+                return RecordFieldType.RECORD.getDataType();
+            case UNION: {
+                final List<Schema> nonNullSubSchemas = avroSchema.getTypes().stream()
+                    .filter(s -> s.getType() != Type.NULL)
+                    .collect(Collectors.toList());
+
+                if (nonNullSubSchemas.size() == 1) {
+                    return determineDataType(nonNullSubSchemas.get(0));
+                }
+
+                final List<DataType> possibleChildTypes = new ArrayList<>(nonNullSubSchemas.size());
+                for (final Schema subSchema : nonNullSubSchemas) {
+                    final DataType childDataType = determineDataType(subSchema);
+                    possibleChildTypes.add(childDataType);
+                }
+
+                return RecordFieldType.CHOICE.getDataType(possibleChildTypes);
+            }
+        }
+
+        return null;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
new file mode 100644
index 0000000..d56c716
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroRecordSetWriter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"avro", "result", "set", "writer", "serializer", "record", "row"})
+@CapabilityDescription("Writes the contents of a Database ResultSet in Binary Avro format. The data types in the Result Set must match those "
+    + "specified by the Avro Schema. No type coercion will occur, with the exception of Date, Time, and Timestamps fields because Avro does not provide "
+    + "support for these types specifically. As a result, they will be converted to String fields using the configured formats. In addition, the label"
+    + "of the column must be a valid Avro field name.")
+public class AvroRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory {
+    static final PropertyDescriptor SCHEMA = new PropertyDescriptor.Builder()
+        .name("Avro Schema")
+        .description("The Avro Schema to use when writing out the Result Set")
+        .addValidator(new AvroSchemaValidator())
+        .expressionLanguageSupported(false)
+        .required(true)
+        .build();
+
+    private volatile Schema schema;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+        properties.add(SCHEMA);
+        return properties;
+    }
+
+    @OnEnabled
+    public void storePropertyValues(final ConfigurationContext context) {
+        schema = new Schema.Parser().parse(context.getProperty(SCHEMA).getValue());
+    }
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger) {
+        return new WriteAvroResult(schema, getDateFormat(), getTimeFormat(), getTimestampFormat());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
new file mode 100644
index 0000000..7151348
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/AvroSchemaValidator.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import org.apache.avro.Schema;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+public class AvroSchemaValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+        try {
+            new Schema.Parser().parse(input);
+
+            return new ValidationResult.Builder()
+                .valid(true)
+                .build();
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Not a valid Avro Schema: " + e.getMessage())
+                .build();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
new file mode 100644
index 0000000..d75d86d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/avro/WriteAvroResult.java
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.avro;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.sql.Blob;
+import java.sql.Clob;
+import java.sql.SQLException;
+import java.sql.Time;
+import java.sql.Timestamp;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.DatumWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+
+public class WriteAvroResult implements RecordSetWriter {
+    private final Schema schema;
+    private final DateFormat dateFormat;
+    private final DateFormat timeFormat;
+    private final DateFormat timestampFormat;
+
+    public WriteAvroResult(final Schema schema, final String dateFormat, final String timeFormat, final String timestampFormat) {
+        this.schema = schema;
+        this.dateFormat = new SimpleDateFormat(dateFormat);
+        this.timeFormat = new SimpleDateFormat(timeFormat);
+        this.timestampFormat = new SimpleDateFormat(timestampFormat);
+    }
+
+    @Override
+    public WriteResult write(final RecordSet rs, final OutputStream outStream) throws IOException {
+        Record record = rs.next();
+        if (record == null) {
+            return WriteResult.of(0, Collections.emptyMap());
+        }
+
+        final GenericRecord rec = new GenericData.Record(schema);
+
+        int nrOfRows = 0;
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, outStream);
+
+            final RecordSchema recordSchema = rs.getSchema();
+
+            do {
+                for (final String fieldName : recordSchema.getFieldNames()) {
+                    final Object value = record.getValue(fieldName);
+
+                    final Field field = schema.getField(fieldName);
+                    if (field == null) {
+                        continue;
+                    }
+
+                    final Object converted;
+                    try {
+                        converted = convert(value, field.schema(), fieldName);
+                    } catch (final SQLException e) {
+                        throw new IOException("Failed to write records to stream", e);
+                    }
+
+                    rec.put(fieldName, converted);
+                }
+
+                dataFileWriter.append(rec);
+                nrOfRows++;
+            } while ((record = rs.next()) != null);
+        }
+
+        return WriteResult.of(nrOfRows, Collections.emptyMap());
+    }
+
+    @Override
+    public WriteResult write(final Record record, final OutputStream out) throws IOException {
+        final GenericRecord rec = new GenericData.Record(schema);
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        try (final DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(datumWriter)) {
+            dataFileWriter.create(schema, out);
+            final RecordSchema recordSchema = record.getSchema();
+
+            for (final String fieldName : recordSchema.getFieldNames()) {
+                final Object value = record.getValue(fieldName);
+
+                final Field field = schema.getField(fieldName);
+                if (field == null) {
+                    continue;
+                }
+
+                final Object converted;
+                try {
+                    converted = convert(value, field.schema(), fieldName);
+                } catch (final SQLException e) {
+                    throw new IOException("Failed to write records to stream", e);
+                }
+
+                rec.put(fieldName, converted);
+            }
+
+            dataFileWriter.append(rec);
+        }
+
+        return WriteResult.of(1, Collections.emptyMap());
+    }
+
+
+    private Object convert(final Object value, final Schema schema, final String fieldName) throws SQLException, IOException {
+        if (value == null) {
+            return null;
+        }
+
+        // Need to handle CLOB and BLOB before getObject() is called, due to ResultSet's maximum portability statement
+        if (value instanceof Clob) {
+            final Clob clob = (Clob) value;
+
+            long numChars = clob.length();
+            char[] buffer = new char[(int) numChars];
+            InputStream is = clob.getAsciiStream();
+            int index = 0;
+            int c = is.read();
+            while (c > 0) {
+                buffer[index++] = (char) c;
+                c = is.read();
+            }
+
+            clob.free();
+            return new String(buffer);
+        }
+
+        if (value instanceof Blob) {
+            final Blob blob = (Blob) value;
+
+            final long numChars = blob.length();
+            final byte[] buffer = new byte[(int) numChars];
+            final InputStream is = blob.getBinaryStream();
+            int index = 0;
+            int c = is.read();
+            while (c > 0) {
+                buffer[index++] = (byte) c;
+                c = is.read();
+            }
+
+            final ByteBuffer bb = ByteBuffer.wrap(buffer);
+            blob.free();
+            return bb;
+        }
+
+        if (value instanceof byte[]) {
+            // bytes requires little bit different handling
+            return ByteBuffer.wrap((byte[]) value);
+        } else if (value instanceof Byte) {
+            // tinyint(1) type is returned by JDBC driver as java.sql.Types.TINYINT
+            // But value is returned by JDBC as java.lang.Byte
+            // (at least H2 JDBC works this way)
+            // direct put to avro record results:
+            // org.apache.avro.AvroRuntimeException: Unknown datum type java.lang.Byte
+            return ((Byte) value).intValue();
+        } else if (value instanceof Short) {
+            //MS SQL returns TINYINT as a Java Short, which Avro doesn't understand.
+            return ((Short) value).intValue();
+        } else if (value instanceof BigDecimal) {
+            // Avro can't handle BigDecimal as a number - it will throw an AvroRuntimeException such as: "Unknown datum type: java.math.BigDecimal: 38"
+            return value.toString();
+        } else if (value instanceof BigInteger) {
+            // Check the precision of the BIGINT. Some databases allow arbitrary precision (> 19), but Avro won't handle that.
+            // It the SQL type is BIGINT and the precision is between 0 and 19 (inclusive); if so, the BigInteger is likely a
+            // long (and the schema says it will be), so try to get its value as a long.
+            // Otherwise, Avro can't handle BigInteger as a number - it will throw an AvroRuntimeException
+            // such as: "Unknown datum type: java.math.BigInteger: 38". In this case the schema is expecting a string.
+            final BigInteger bigInt = (BigInteger) value;
+            if (bigInt.compareTo(BigInteger.valueOf(Long.MAX_VALUE)) > 0) {
+                return value.toString();
+            } else {
+                return bigInt.longValue();
+            }
+        } else if (value instanceof Boolean) {
+            return value;
+        } else if (value instanceof Map) {
+            // TODO: Revisit how we handle a lot of these cases....
+            switch (schema.getType()) {
+                case MAP:
+                    return value;
+                case RECORD:
+                    final GenericData.Record avroRecord = new GenericData.Record(schema);
+
+                    final Record record = (Record) value;
+                    for (final String recordFieldName : record.getSchema().getFieldNames()) {
+                        final Object recordFieldValue = record.getValue(recordFieldName);
+
+                        final Field field = schema.getField(recordFieldName);
+                        if (field == null) {
+                            continue;
+                        }
+
+                        final Object converted = convert(recordFieldValue, field.schema(), recordFieldName);
+                        avroRecord.put(recordFieldName, converted);
+                    }
+                    return avroRecord;
+            }
+
+            return value.toString();
+
+        } else if (value instanceof List) {
+            return value;
+        } else if (value instanceof Object[]) {
+            final List<Object> list = new ArrayList<>();
+            for (final Object o : ((Object[]) value)) {
+                final Object converted = convert(o, schema.getElementType(), fieldName);
+                list.add(converted);
+            }
+            return list;
+        } else if (value instanceof Number) {
+            return value;
+        } else if (value instanceof java.util.Date) {
+            final java.util.Date date = (java.util.Date) value;
+            return dateFormat.format(date);
+        } else if (value instanceof java.sql.Date) {
+            final java.sql.Date sqlDate = (java.sql.Date) value;
+            final java.util.Date date = new java.util.Date(sqlDate.getTime());
+            return dateFormat.format(date);
+        } else if (value instanceof Time) {
+            final Time time = (Time) value;
+            final java.util.Date date = new java.util.Date(time.getTime());
+            return timeFormat.format(date);
+        } else if (value instanceof Timestamp) {
+            final Timestamp time = (Timestamp) value;
+            final java.util.Date date = new java.util.Date(time.getTime());
+            return timestampFormat.format(date);
+        }
+
+        // The different types that we support are numbers (int, long, double, float),
+        // as well as boolean values and Strings. Since Avro doesn't provide
+        // timestamp types, we want to convert those to Strings. So we will cast anything other
+        // than numbers or booleans to strings by using the toString() method.
+        return value.toString();
+    }
+
+
+    @Override
+    public String getMimeType() {
+        return "application/avro-binary";
+    }
+
+
+    public static String normalizeNameForAvro(String inputName) {
+        String normalizedName = inputName.replaceAll("[^A-Za-z0-9_]", "_");
+        if (Character.isDigit(normalizedName.charAt(0))) {
+            normalizedName = "_" + normalizedName;
+        }
+        return normalizedName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
new file mode 100644
index 0000000..eccad7d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVReader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+
+@Tags({"csv", "parse", "record", "row", "reader", "delimited", "comma", "separated", "values"})
+@CapabilityDescription("Parses CSV-formatted data, returning each row in the CSV file as a separate record. "
+    + "This reader assumes that the first line in the content is the column names and all subsequent lines are "
+    + "the values. By default, the reader will assume that all columns are of 'String' type, but this can be "
+    + "overridden by adding a user-defined Property where the key is the name of a column and the value is the "
+    + "type of the column. For example, if a Property has the name \"balance\" with a value of float, it the "
+    + "reader will attempt to coerce all values in the \"balance\" column into a floating-point number. See "
+    + "Controller Service's Usage for further documentation.")
+@DynamicProperty(name = "<name of column in CSV>", value = "<type of column values in CSV>",
+    description = "User-defined properties are used to indicate that the values of a specific column should be interpreted as a "
+    + "user-defined data type (e.g., int, double, float, date, etc.)", supportsExpressionLanguage = false)
+public class CSVReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory {
+
+    @Override
+    public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException {
+        return new CSVRecordReader(in, logger, getFieldTypeOverrides());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
new file mode 100644
index 0000000..c2e8963
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordReader.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import au.com.bytecode.opencsv.CSVReader;
+
+public class CSVRecordReader implements RecordReader {
+    private final ComponentLog logger;
+    private final CSVReader reader;
+    private final String[] firstLine;
+    private final Map<String, DataType> fieldTypeOverrides;
+    private RecordSchema schema;
+
+    public CSVRecordReader(final InputStream in, final ComponentLog logger, final Map<String, DataType> fieldTypeOverrides) throws IOException {
+        this.logger = logger;
+        reader = new CSVReader(new InputStreamReader(new BufferedInputStream(in)));
+        firstLine = reader.readNext();
+        this.fieldTypeOverrides = fieldTypeOverrides;
+    }
+
+    @Override
+    public Record nextRecord() throws IOException, MalformedRecordException {
+        final RecordSchema schema = getSchema();
+
+        while (true) {
+            final String[] line = reader.readNext();
+            if (line == null) {
+                return null;
+            }
+
+            final List<DataType> fieldTypes = schema.getDataTypes();
+            if (fieldTypes.size() != line.length) {
+                logger.warn("Found record with incorrect number of fields. Expected {} but found {}; skipping record", new Object[] {fieldTypes.size(), line.length});
+                continue;
+            }
+
+            try {
+                final Map<String, Object> rowValues = new HashMap<>(schema.getFieldCount());
+
+                int i = 0;
+                for (final String fieldName : schema.getFieldNames()) {
+                    if (i >= line.length) {
+                        rowValues.put(fieldName, null);
+                        continue;
+                    }
+
+                    final String rawValue = line[i++].trim();
+                    final Object converted = convert(schema.getDataType(fieldName).orElse(null), rawValue);
+                    rowValues.put(fieldName, converted);
+                }
+
+                return new MapRecord(schema, rowValues);
+            } catch (final Exception e) {
+                throw new MalformedRecordException("Found invalid CSV record", e);
+            }
+        }
+    }
+
+    @Override
+    public RecordSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+
+        final List<RecordField> recordFields = new ArrayList<>();
+        for (final String element : firstLine) {
+
+            final String name = element.trim();
+            final DataType dataType;
+
+            final DataType overriddenDataType = fieldTypeOverrides.get(name);
+            if (overriddenDataType != null) {
+                dataType = overriddenDataType;
+            } else {
+                dataType = RecordFieldType.STRING.getDataType();
+            }
+
+            final RecordField field = new RecordField(name, dataType);
+            recordFields.add(field);
+        }
+
+        if (recordFields.isEmpty()) {
+            recordFields.add(new RecordField("line", RecordFieldType.STRING.getDataType()));
+        }
+
+        schema = new SimpleRecordSchema(recordFields);
+        return schema;
+    }
+
+    protected Object convert(final DataType dataType, final String value) {
+        if (dataType == null) {
+            return value;
+        }
+
+        switch (dataType.getFieldType()) {
+            case BOOLEAN:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Boolean.parseBoolean(value);
+            case BYTE:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Byte.parseByte(value);
+            case SHORT:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Short.parseShort(value);
+            case INT:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Integer.parseInt(value);
+            case LONG:
+            case BIGINT:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Long.parseLong(value);
+            case FLOAT:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Float.parseFloat(value);
+            case DOUBLE:
+                if (value.length() == 0) {
+                    return null;
+                }
+                return Double.parseDouble(value);
+            case DATE:
+                if (value.length() == 0) {
+                    return null;
+                }
+                try {
+                    final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+                    return new java.sql.Date(date.getTime());
+                } catch (final ParseException e) {
+                    logger.warn("Found invalid value for DATE field: " + value + " does not match expected format of "
+                        + dataType.getFormat() + "; will substitute a NULL value for this field");
+                    return null;
+                }
+            case TIME:
+                if (value.length() == 0) {
+                    return null;
+                }
+                try {
+                    final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+                    return new java.sql.Time(date.getTime());
+                } catch (final ParseException e) {
+                    logger.warn("Found invalid value for TIME field: " + value + " does not match expected format of "
+                        + dataType.getFormat() + "; will substitute a NULL value for this field");
+                    return null;
+                }
+            case TIMESTAMP:
+                if (value.length() == 0) {
+                    return null;
+                }
+                try {
+                    final Date date = new SimpleDateFormat(dataType.getFormat()).parse(value);
+                    return new java.sql.Timestamp(date.getTime());
+                } catch (final ParseException e) {
+                    logger.warn("Found invalid value for TIMESTAMP field: " + value + " does not match expected format of "
+                        + dataType.getFormat() + "; will substitute a NULL value for this field");
+                    return null;
+                }
+            case STRING:
+            default:
+                return value;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
new file mode 100644
index 0000000..906e9c4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/CSVRecordSetWriter.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.serialization.AbstractRecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+
+@Tags({"csv", "result", "set", "writer", "serializer", "record", "row"})
+@CapabilityDescription("Writes the contents of a Database ResultSet as CSV data. The first line written "
+    + "will be the column names. All subsequent lines will be the values corresponding to those columns.")
+public class CSVRecordSetWriter extends AbstractRecordSetWriter implements RecordSetWriterFactory {
+
+    @Override
+    public RecordSetWriter createWriter(final ComponentLog logger) {
+        return new WriteCSVResult(getDateFormat(), getTimeFormat(), getTimestampFormat());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
new file mode 100644
index 0000000..79c602d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/csv/WriteCSVResult.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.csv;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.Collections;
+import java.util.Optional;
+
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.stream.io.NonCloseableOutputStream;
+
+import au.com.bytecode.opencsv.CSVWriter;
+
+public class WriteCSVResult implements RecordSetWriter {
+    private final String dateFormat;
+    private final String timeFormat;
+    private final String timestampFormat;
+
+    public WriteCSVResult(final String dateFormat, final String timeFormat, final String timestampFormat) {
+        this.dateFormat = dateFormat;
+        this.timeFormat = timeFormat;
+        this.timestampFormat = timestampFormat;
+    }
+
+    private String getFormat(final Record record, final String fieldName) {
+        final Optional<DataType> dataTypeOption = record.getSchema().getDataType(fieldName);
+        if (!dataTypeOption.isPresent()) {
+            return null;
+        }
+
+        final DataType dataType = dataTypeOption.get();
+        switch (dataType.getFieldType()) {
+            case DATE:
+                return dateFormat == null ? dataType.getFormat() : dateFormat;
+            case TIME:
+                return timeFormat == null ? dataType.getFormat() : timeFormat;
+            case TIMESTAMP:
+                return timestampFormat == null ? dataType.getFormat() : timestampFormat;
+        }
+
+        return dataType.getFormat();
+    }
+
+    @Override
+    public WriteResult write(final RecordSet rs, final OutputStream rawOut) throws IOException {
+        int count = 0;
+        try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
+            final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
+            final CSVWriter writer = new CSVWriter(streamWriter)) {
+
+            try {
+                final RecordSchema schema = rs.getSchema();
+                final String[] columnNames = schema.getFieldNames().toArray(new String[0]);
+                writer.writeNext(columnNames);
+
+                Record record;
+                while ((record = rs.next()) != null) {
+                    final String[] colVals = new String[schema.getFieldCount()];
+                    int i = 0;
+                    for (final String fieldName : schema.getFieldNames()) {
+                        colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName));
+                    }
+
+                    writer.writeNext(colVals);
+                    count++;
+                }
+            } catch (final Exception e) {
+                throw new IOException("Failed to serialize results", e);
+            }
+        }
+
+        return WriteResult.of(count, Collections.emptyMap());
+    }
+
+    @Override
+    public WriteResult write(final Record record, final OutputStream rawOut) throws IOException {
+        try (final OutputStream nonCloseable = new NonCloseableOutputStream(rawOut);
+            final OutputStreamWriter streamWriter = new OutputStreamWriter(nonCloseable);
+            final CSVWriter writer = new CSVWriter(streamWriter)) {
+
+            try {
+                final RecordSchema schema = record.getSchema();
+                final String[] columnNames = schema.getFieldNames().toArray(new String[0]);
+                writer.writeNext(columnNames);
+
+                final String[] colVals = new String[schema.getFieldCount()];
+                int i = 0;
+                for (final String fieldName : schema.getFieldNames()) {
+                    colVals[i++] = record.getAsString(fieldName, getFormat(record, fieldName));
+                }
+
+                writer.writeNext(colVals);
+            } catch (final Exception e) {
+                throw new IOException("Failed to serialize results", e);
+            }
+        }
+
+        return WriteResult.of(1, Collections.emptyMap());
+    }
+
+    @Override
+    public String getMimeType() {
+        return "text/csv";
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
new file mode 100644
index 0000000..dd9c4e0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokExpressionValidator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+
+import io.thekraken.grok.api.Grok;
+
+public class GrokExpressionValidator implements Validator {
+
+    @Override
+    public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+        try {
+            new Grok().compile(input);
+        } catch (final Exception e) {
+            return new ValidationResult.Builder()
+                .input(input)
+                .subject(subject)
+                .valid(false)
+                .explanation("Invalid Grok pattern: " + e.getMessage())
+                .build();
+        }
+
+        return new ValidationResult.Builder()
+            .input(input)
+            .subject(subject)
+            .valid(true)
+            .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
new file mode 100644
index 0000000..f72d5d5
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokReader.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RowRecordReaderFactory;
+import org.apache.nifi.serialization.UserTypeOverrideRowReader;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.exception.GrokException;
+
+@Tags({"grok", "logs", "logfiles", "parse", "unstructured", "text", "record", "reader", "regex", "pattern", "logstash"})
+@CapabilityDescription("Provides a mechanism for reading unstructured text data, such as log files, and structuring the data "
+    + "so that it can be processed. The service is configured using Grok patterns. "
+    + "The service reads from a stream of data and splits each message that it finds into a separate Record, each containing the fields that are configured. "
+    + "If a line in the input does not match the expected message pattern, the line of text is considered to be part of the previous "
+    + "message, with the exception of stack traces. A stack trace that is found at the end of a log message is considered to be part "
+    + "of the previous message but is added to the 'STACK_TRACE' field of the Record. If a record has no stack trace, it will have a NULL value "
+    + "for the STACK_TRACE field.")
+public class GrokReader extends UserTypeOverrideRowReader implements RowRecordReaderFactory {
+    private volatile Grok grok;
+
+    private static final String DEFAULT_PATTERN_NAME = "/default-grok-patterns.txt";
+
+    static final PropertyDescriptor PATTERN_FILE = new PropertyDescriptor.Builder()
+        .name("Grok Pattern File")
+        .description("Path to a file that contains Grok Patterns to use for parsing logs. If not specified, a built-in default Pattern file "
+            + "will be used. If specified, all patterns in the given pattern file will override the default patterns. See the Controller Service's "
+            + "Additional Details for a list of pre-defined patterns.")
+        .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+        .expressionLanguageSupported(true)
+        .required(false)
+        .build();
+    static final PropertyDescriptor GROK_EXPRESSION = new PropertyDescriptor.Builder()
+        .name("Grok Expression")
+        .description("Specifies the format of a log line in Grok format. This allows the Record Reader to understand how to parse each log line. "
+            + "If a line in the log file does not match this pattern, the line will be assumed to belong to the previous log message.")
+        .addValidator(new GrokExpressionValidator())
+        .required(true)
+        .build();
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(PATTERN_FILE);
+        properties.add(GROK_EXPRESSION);
+        return properties;
+    }
+
+    @OnEnabled
+    public void preCompile(final ConfigurationContext context) throws GrokException, IOException {
+        grok = new Grok();
+
+        try (final InputStream in = getClass().getResourceAsStream(DEFAULT_PATTERN_NAME);
+            final Reader reader = new InputStreamReader(in)) {
+            grok.addPatternFromReader(reader);
+        }
+
+        if (context.getProperty(PATTERN_FILE).isSet()) {
+            grok.addPatternFromFile(context.getProperty(PATTERN_FILE).getValue());
+        }
+
+        grok.compile(context.getProperty(GROK_EXPRESSION).getValue());
+    }
+
+    @Override
+    public RecordReader createRecordReader(final InputStream in, final ComponentLog logger) throws IOException {
+        return new GrokRecordReader(in, grok, getFieldTypeOverrides());
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a88d3bfa/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
new file mode 100644
index 0000000..bdf12f9
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/grok/GrokRecordReader.java
@@ -0,0 +1,323 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.grok;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.text.ParseException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.commons.lang3.time.FastDateFormat;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordFieldType;
+import org.apache.nifi.serialization.record.RecordSchema;
+
+import io.thekraken.grok.api.Grok;
+import io.thekraken.grok.api.GrokUtils;
+import io.thekraken.grok.api.Match;
+
+public class GrokRecordReader implements RecordReader {
+    private final BufferedReader reader;
+    private final Grok grok;
+    private final Map<String, DataType> fieldTypeOverrides;
+
+    private String nextLine;
+    private RecordSchema schema;
+
+    static final String STACK_TRACE_COLUMN_NAME = "STACK_TRACE";
+    private static final Pattern STACK_TRACE_PATTERN = Pattern.compile(
+        "^\\s*(?:(?:    |\\t)+at )|"
+            + "(?:(?:    |\\t)+\\[CIRCULAR REFERENCE\\:)|"
+            + "(?:Caused by\\: )|"
+            + "(?:Suppressed\\: )|"
+            + "(?:\\s+... \\d+ (?:more|common frames? omitted)$)");
+
+    private static final FastDateFormat TIME_FORMAT_DATE;
+    private static final FastDateFormat TIME_FORMAT_TIME;
+    private static final FastDateFormat TIME_FORMAT_TIMESTAMP;
+
+    static {
+        final TimeZone gmt = TimeZone.getTimeZone("GMT");
+        TIME_FORMAT_DATE = FastDateFormat.getInstance("yyyy-MM-dd", gmt);
+        TIME_FORMAT_TIME = FastDateFormat.getInstance("HH:mm:ss", gmt);
+        TIME_FORMAT_TIMESTAMP = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss", gmt);
+    }
+
+    public GrokRecordReader(final InputStream in, final Grok grok, final Map<String, DataType> fieldTypeOverrides) {
+        this.reader = new BufferedReader(new InputStreamReader(in));
+        this.grok = grok;
+        this.fieldTypeOverrides = fieldTypeOverrides;
+    }
+
+    @Override
+    public void close() throws IOException {
+        reader.close();
+    }
+
+    @Override
+    public Record nextRecord() throws IOException, MalformedRecordException {
+        final String line = nextLine == null ? reader.readLine() : nextLine;
+        nextLine = null; // ensure that we don't process nextLine again
+        if (line == null) {
+            return null;
+        }
+
+        final RecordSchema schema = getSchema();
+
+        final Match match = grok.match(line);
+        match.captures();
+        final Map<String, Object> valueMap = match.toMap();
+        if (valueMap.isEmpty()) {   // We were unable to match the pattern so return an empty Object array.
+            return new MapRecord(schema, Collections.emptyMap());
+        }
+
+        // Read the next line to see if it matches the pattern (in which case we will simply leave it for
+        // the next call to nextRecord()) or we will attach it to the previously read record.
+        String stackTrace = null;
+        final StringBuilder toAppend = new StringBuilder();
+        while ((nextLine = reader.readLine()) != null) {
+            final Match nextLineMatch = grok.match(nextLine);
+            nextLineMatch.captures();
+            final Map<String, Object> nextValueMap = nextLineMatch.toMap();
+            if (nextValueMap.isEmpty()) {
+                // next line did not match. Check if it indicates a Stack Trace. If so, read until
+                // the stack trace ends. Otherwise, append the next line to the last field in the record.
+                if (isStartOfStackTrace(nextLine)) {
+                    stackTrace = readStackTrace(nextLine);
+                    break;
+                } else {
+                    toAppend.append("\n").append(nextLine);
+                }
+            } else {
+                // The next line matched our pattern.
+                break;
+            }
+        }
+
+        try {
+            final List<DataType> fieldTypes = schema.getDataTypes();
+            final Map<String, Object> values = new HashMap<>(fieldTypes.size());
+
+            for (final String fieldName : schema.getFieldNames()) {
+                final Object value = valueMap.get(fieldName);
+                if (value == null) {
+                    values.put(fieldName, null);
+                    continue;
+                }
+
+                final DataType fieldType = schema.getDataType(fieldName).orElse(null);
+                final Object converted = convert(fieldType, value.toString());
+                values.put(fieldName, converted);
+            }
+
+            final String lastFieldBeforeStackTrace = schema.getFieldNames().get(schema.getFieldCount() - 2);
+            if (toAppend.length() > 0) {
+                final Object existingValue = values.get(lastFieldBeforeStackTrace);
+                final String updatedValue = existingValue == null ? toAppend.toString() : existingValue + toAppend.toString();
+                values.put(lastFieldBeforeStackTrace, updatedValue);
+            }
+
+            values.put(STACK_TRACE_COLUMN_NAME, stackTrace);
+
+            return new MapRecord(schema, values);
+        } catch (final Exception e) {
+            throw new MalformedRecordException("Found invalid log record and will skip it. Record: " + line, e);
+        }
+    }
+
+
+    private boolean isStartOfStackTrace(final String line) {
+        if (line == null) {
+            return false;
+        }
+
+        // Stack Traces are generally of the form:
+        // java.lang.IllegalArgumentException: My message
+        //   at org.apache.nifi.MyClass.myMethod(MyClass.java:48)
+        //   at java.lang.Thread.run(Thread.java:745) [na:1.8.0_60]
+        // Caused by: java.net.SocketTimeoutException: null
+        //   ... 13 common frames omitted
+
+        int index = line.indexOf("Exception: ");
+        if (index < 0) {
+            index = line.indexOf("Error: ");
+        }
+
+        if (index < 0) {
+            return false;
+        }
+
+        if (line.indexOf(" ") < index) {
+            return false;
+        }
+
+        return true;
+    }
+
+    private String readStackTrace(final String firstLine) throws IOException {
+        final StringBuilder sb = new StringBuilder(firstLine);
+
+        String line;
+        while ((line = reader.readLine()) != null) {
+            if (isLineInStackTrace(line)) {
+                sb.append("\n").append(line);
+            } else {
+                nextLine = line;
+                break;
+            }
+        }
+
+        return sb.toString();
+    }
+
+    private boolean isLineInStackTrace(final String line) {
+        return STACK_TRACE_PATTERN.matcher(line).find();
+    }
+
+
+    protected Object convert(final DataType fieldType, final String string) {
+        if (fieldType == null) {
+            return string;
+        }
+        switch (fieldType.getFieldType()) {
+            case BOOLEAN:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Boolean.parseBoolean(string);
+            case BYTE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Byte.parseByte(string);
+            case SHORT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Short.parseShort(string);
+            case INT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Integer.parseInt(string);
+            case LONG:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Long.parseLong(string);
+            case FLOAT:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Float.parseFloat(string);
+            case DOUBLE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                return Double.parseDouble(string);
+            case DATE:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    Date date = TIME_FORMAT_DATE.parse(string);
+                    return new java.sql.Date(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case TIME:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    Date date = TIME_FORMAT_TIME.parse(string);
+                    return new java.sql.Time(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case TIMESTAMP:
+                if (string.length() == 0) {
+                    return null;
+                }
+                try {
+                    Date date = TIME_FORMAT_TIMESTAMP.parse(string);
+                    return new java.sql.Timestamp(date.getTime());
+                } catch (ParseException e) {
+                    return null;
+                }
+            case STRING:
+            default:
+                return string;
+        }
+    }
+
+
+    @Override
+    public RecordSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+
+        final List<RecordField> fields = new ArrayList<>();
+
+        String grokExpression = grok.getOriginalGrokPattern();
+        while (grokExpression.length() > 0) {
+            final Matcher matcher = GrokUtils.GROK_PATTERN.matcher(grokExpression);
+            if (matcher.find()) {
+                final Map<String, String> namedGroups = GrokUtils.namedGroups(matcher, grokExpression);
+                final String fieldName = namedGroups.get("subname");
+
+                DataType dataType = fieldTypeOverrides.get(fieldName);
+                if (dataType == null) {
+                    dataType = RecordFieldType.STRING.getDataType();
+                }
+
+                final RecordField recordField = new RecordField(fieldName, dataType);
+                fields.add(recordField);
+
+                if (grokExpression.length() > matcher.end() + 1) {
+                    grokExpression = grokExpression.substring(matcher.end() + 1);
+                } else {
+                    break;
+                }
+            }
+        }
+
+        fields.add(new RecordField(STACK_TRACE_COLUMN_NAME, RecordFieldType.STRING.getDataType()));
+
+        schema = new SimpleRecordSchema(fields);
+        return schema;
+    }
+
+}