You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2017/05/01 20:11:58 UTC

[05/17] nifi git commit: NIFI-3724 - Initial commit of Parquet bundle with PutParquet and FetchParquet - Creating nifi-records-utils to share utility code from record services - Refactoring Parquet tests to use MockRecorderParser and MockRecordWriter - R

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
new file mode 100644
index 0000000..934eb59
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.ReadsAttribute;
+import org.apache.nifi.annotation.behavior.Restricted;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@Tags({"put", "parquet", "hadoop", "HDFS", "filesystem", "restricted"})
+@CapabilityDescription("Reads records from an incoming FlowFile using the provided Record Reader, and writes those records " +
+        "to a Parquet file. The schema for the Parquet file must be provided in the processor properties. This processor will " +
+        "first write a temporary dot file and upon successfully writing every record to the dot file, it will rename the " +
+        "dot file to it's final name. If the dot file cannot be renamed, the rename operation will be attempted up to 10 times, and " +
+        "if still not successful, the dot file will be deleted and the flow file will be routed to failure. " +
+        " If any error occurs while reading records from the input, or writing records to the output, " +
+        "the entire dot file will be removed and the flow file will be routed to failure or retry, depending on the error.")
+@ReadsAttribute(attribute = "filename", description = "The name of the file to write comes from the value of this attribute.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "The name of the file is stored in this attribute."),
+        @WritesAttribute(attribute = "absolute.hdfs.path", description = "The absolute path to the file is stored in this attribute."),
+        @WritesAttribute(attribute = "record.count", description = "The number of records written to the Parquet file")
+})
+@Restricted("Provides operator the ability to write to any file that NiFi has access to in HDFS or the local filesystem.")
+public class PutParquet extends AbstractPutHDFSRecord {
+
+    public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
+            .name("row-group-size")
+            .displayName("Row Group Size")
+            .description("The row group size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("The page size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
+            .name("dictionary-page-size")
+            .displayName("Dictionary Page Size")
+            .description("The dictionary page size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
+            .name("max-padding-size")
+            .displayName("Max Padding Size")
+            .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
+                    "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
+    public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
+            .name("enable-dictionary-encoding")
+            .displayName("Enable Dictionary Encoding")
+            .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
+            .name("enable-validation")
+            .displayName("Enable Validation")
+            .description("Specifies whether validation should be enabled for the Parquet writer")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
+            .name("writer-version")
+            .displayName("Writer Version")
+            .description("Specifies the version used by Parquet writer")
+            .allowableValues(ParquetProperties.WriterVersion.values())
+            .build();
+
+    public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder()
+            .name("remove-crc-files")
+            .displayName("Remove CRC Files")
+            .description("Specifies whether the corresponding CRC file should be deleted upon successfully writing a Parquet file")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .build();
+
+    public static final List<AllowableValue> COMPRESSION_TYPES;
+    static {
+        final List<AllowableValue> compressionTypes = new ArrayList<>();
+        for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
+            final String name = compressionCodecName.name();
+            compressionTypes.add(new AllowableValue(name, name));
+        }
+        COMPRESSION_TYPES = Collections.unmodifiableList(compressionTypes);
+    }
+
+    @Override
+    public List<AllowableValue> getCompressionTypes(final ProcessorInitializationContext context) {
+        return COMPRESSION_TYPES;
+    }
+
+    @Override
+    public String getDefaultCompressionType(final ProcessorInitializationContext context) {
+        return CompressionCodecName.UNCOMPRESSED.name();
+    }
+
+    @Override
+    public List<PropertyDescriptor> getAdditionalProperties() {
+        final List<PropertyDescriptor> props = new ArrayList<>();
+        props.add(ROW_GROUP_SIZE);
+        props.add(PAGE_SIZE);
+        props.add(DICTIONARY_PAGE_SIZE);
+        props.add(MAX_PADDING_SIZE);
+        props.add(ENABLE_DICTIONARY_ENCODING);
+        props.add(ENABLE_VALIDATION);
+        props.add(WRITER_VERSION);
+        props.add(REMOVE_CRC_FILES);
+        return Collections.unmodifiableList(props);
+    }
+
+    @Override
+    public HDFSRecordWriter createHDFSRecordWriter(final ProcessContext context, final FlowFile flowFile, final Configuration conf, final Path path, final RecordSchema schema)
+            throws IOException, SchemaNotFoundException {
+
+        final Schema avroSchema = AvroTypeUtil.extractAvroSchema(schema);
+
+        final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter
+                .<GenericRecord>builder(path)
+                .withSchema(avroSchema);
+
+        applyCommonConfig(parquetWriter, context, flowFile, conf);
+
+        return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
+    }
+
+    private void applyCommonConfig(final ParquetWriter.Builder builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
+        builder.withConf(conf);
+
+        // Required properties
+
+        final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
+        final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+        builder.withWriteMode(mode);
+
+        final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName());
+        final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
+
+        final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
+        builder.withCompressionCodec(codecName);
+
+        // Optional properties
+
+        if (context.getProperty(ROW_GROUP_SIZE).isSet()){
+            try {
+                final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (rowGroupSize != null) {
+                    builder.withRowGroupSize(rowGroupSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(PAGE_SIZE).isSet()) {
+            try {
+                final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (pageSize != null) {
+                    builder.withPageSize(pageSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
+            try {
+                final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (dictionaryPageSize != null) {
+                    builder.withDictionaryPageSize(dictionaryPageSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
+            try {
+                final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (maxPaddingSize != null) {
+                    builder.withMaxPaddingSize(maxPaddingSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
+            final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
+            builder.withDictionaryEncoding(enableDictionaryEncoding);
+        }
+
+        if (context.getProperty(ENABLE_VALIDATION).isSet()) {
+            final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
+            builder.withValidation(enableValidation);
+        }
+
+        if (context.getProperty(WRITER_VERSION).isSet()) {
+            final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
+            builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
+        }
+    }
+
+    @Override
+    protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
+        final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean();
+        if (removeCRCFiles) {
+            final String filename = destFile.getName();
+            final String hdfsPath = destFile.getParent().toString();
+
+            final Path crcFile = new Path(hdfsPath, "." + filename + ".crc");
+            deleteQuietly(getFileSystem(), crcFile);
+        }
+
+        return flowFile;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java
new file mode 100644
index 0000000..8421e37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordReader.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet.record;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.parquet.hadoop.ParquetReader;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * HDFSRecordReader that reads Parquet files using Avro.
+ */
+public class AvroParquetHDFSRecordReader implements HDFSRecordReader {
+
+    private GenericRecord lastRecord;
+    private RecordSchema recordSchema;
+    private boolean initialized = false;
+
+    private final ParquetReader<GenericRecord> parquetReader;
+
+    public AvroParquetHDFSRecordReader(final ParquetReader<GenericRecord> parquetReader) {
+        this.parquetReader = parquetReader;
+    }
+
+    @Override
+    public Record nextRecord() throws IOException {
+        if (initialized && lastRecord == null) {
+            return null;
+        }
+
+        lastRecord = parquetReader.read();
+        initialized = true;
+
+        if (lastRecord == null) {
+            return null;
+        }
+
+        if (recordSchema == null) {
+            recordSchema = AvroTypeUtil.createSchema(lastRecord.getSchema());
+        }
+
+        final Map<String, Object> values = AvroTypeUtil.convertAvroRecordToMap(lastRecord, recordSchema);
+        return new MapRecord(recordSchema, values);
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        parquetReader.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java
new file mode 100644
index 0000000..7ef37b1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/record/AvroParquetHDFSRecordWriter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet.record;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.parquet.hadoop.ParquetWriter;
+
+import java.io.IOException;
+
+/**
+ * HDFSRecordWriter that writes Parquet files using Avro as the schema representation.
+ */
+public class AvroParquetHDFSRecordWriter implements HDFSRecordWriter {
+
+    private final Schema avroSchema;
+    private final ParquetWriter<GenericRecord> parquetWriter;
+
+    public AvroParquetHDFSRecordWriter(final ParquetWriter<GenericRecord> parquetWriter, final Schema avroSchema) {
+        this.avroSchema = avroSchema;
+        this.parquetWriter = parquetWriter;
+    }
+
+    @Override
+    public void write(final Record record) throws IOException {
+        final GenericRecord genericRecord = AvroTypeUtil.createAvroRecord(record, avroSchema);
+        parquetWriter.write(genericRecord);
+    }
+
+    @Override
+    public void close() throws IOException {
+        parquetWriter.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
new file mode 100644
index 0000000..36583e3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+org.apache.nifi.processors.parquet.PutParquet
+org.apache.nifi.processors.parquet.FetchParquet
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
new file mode 100644
index 0000000..a4f1513
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/FetchParquetTest.java
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+public class FetchParquetTest {
+
+    static final String DIRECTORY = "target";
+    static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
+    static final String RECORD_HEADER = "name,favorite_number,favorite_color";
+
+    private Schema schema;
+    private Configuration testConf;
+    private FetchParquet proc;
+    private TestRunner testRunner;
+
+    @Before
+    public void setup() throws IOException, InitializationException {
+        final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        testConf = new Configuration();
+        testConf.addResource(new Path(TEST_CONF_PATH));
+
+        proc = new FetchParquet();
+    }
+
+    private void configure(final FetchParquet fetchParquet) throws InitializationException {
+        testRunner = TestRunners.newTestRunner(fetchParquet);
+        testRunner.setProperty(FetchParquet.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+
+        final RecordSetWriterFactory writerFactory = new MockRecordWriter(RECORD_HEADER, false);
+        testRunner.addControllerService("mock-writer-factory", writerFactory);
+        testRunner.enableControllerService(writerFactory);
+        testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory");
+    }
+
+    @Test
+    public void testFetchParquetToCSV() throws IOException, InitializationException {
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsers(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_SUCCESS, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_SUCCESS).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.RECORD_COUNT_ATTR, String.valueOf(numUsers));
+        flowFile.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "text/plain");
+
+        // the mock record writer will write the header for each record so replace those to get down to just the records
+        String flowFileContent = new String(flowFile.toByteArray(), StandardCharsets.UTF_8);
+        flowFileContent = flowFileContent.replaceAll(RECORD_HEADER + "\n", "");
+
+        verifyCSVRecords(numUsers, flowFileContent);
+    }
+
+    @Test
+    public void testFetchWhenELEvaluatesToEmptyShouldRouteFailure() throws InitializationException {
+        configure(proc);
+        testRunner.setProperty(FetchParquet.FILENAME, "${missing.attr}");
+
+        testRunner.enqueue("TRIGGER");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "Can not create a Path from an empty string");
+        flowFile.assertContentEquals("TRIGGER");
+    }
+
+    @Test
+    public void testFetchWhenDoesntExistShouldRouteToFailure() throws InitializationException {
+        configure(proc);
+
+        final String filename = "/tmp/does-not-exist-" + System.currentTimeMillis();
+        testRunner.setProperty(FetchParquet.FILENAME, filename);
+
+        testRunner.enqueue("TRIGGER");
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_FAILURE, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_FAILURE).get(0);
+        flowFile.assertAttributeEquals(FetchParquet.FETCH_FAILURE_REASON_ATTR, "File " + filename + " does not exist");
+        flowFile.assertContentEquals("TRIGGER");
+    }
+
+    @Test
+    public void testIOExceptionCreatingReaderShouldRouteToRetry() throws InitializationException, IOException {
+        final FetchParquet proc = new FetchParquet() {
+            @Override
+            public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path)
+                    throws IOException {
+                throw new IOException("IOException");
+            }
+        };
+
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsers(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
+        flowFile.assertContentEquals("TRIGGER");
+    }
+
+    @Test
+    public void testIOExceptionWhileReadingShouldRouteToRetry() throws IOException, InitializationException {
+        final FetchParquet proc = new FetchParquet() {
+            @Override
+            public HDFSRecordReader createHDFSRecordReader(ProcessContext context, FlowFile flowFile, Configuration conf, Path path)
+                    throws IOException {
+                return new HDFSRecordReader() {
+                    @Override
+                    public Record nextRecord() throws IOException {
+                        throw new IOException("IOException");
+                    }
+                    @Override
+                    public void close() throws IOException {
+                    }
+                };
+            }
+        };
+
+        configure(proc);
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsers(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
+        flowFile.assertContentEquals("TRIGGER");
+    }
+
+    @Test
+    public void testIOExceptionWhileWritingShouldRouteToRetry() throws InitializationException, IOException, SchemaNotFoundException {
+        configure(proc);
+
+        final RecordSetWriter recordSetWriter = Mockito.mock(RecordSetWriter.class);
+        when(recordSetWriter.write(any(RecordSet.class), any(OutputStream.class))).thenThrow(new IOException("IOException"));
+
+        final RecordSetWriterFactory recordSetWriterFactory = Mockito.mock(RecordSetWriterFactory.class);
+        when(recordSetWriterFactory.getIdentifier()).thenReturn("mock-writer-factory");
+        when(recordSetWriterFactory.createWriter(any(ComponentLog.class), any(FlowFile.class), any(InputStream.class))).thenReturn(recordSetWriter);
+
+        testRunner.addControllerService("mock-writer-factory", recordSetWriterFactory);
+        testRunner.enableControllerService(recordSetWriterFactory);
+        testRunner.setProperty(FetchParquet.RECORD_WRITER, "mock-writer-factory");
+
+        final File parquetDir = new File(DIRECTORY);
+        final File parquetFile = new File(parquetDir,"testFetchParquetToCSV.parquet");
+        final int numUsers = 10;
+        writeParquetUsers(parquetFile, numUsers);
+
+        final Map<String,String> attributes = new HashMap<>();
+        attributes.put(CoreAttributes.PATH.key(), parquetDir.getAbsolutePath());
+        attributes.put(CoreAttributes.FILENAME.key(), parquetFile.getName());
+
+        testRunner.enqueue("TRIGGER", attributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(FetchParquet.REL_RETRY, 1);
+
+        final MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(FetchParquet.REL_RETRY).get(0);
+        flowFile.assertContentEquals("TRIGGER");
+    }
+
+    protected void verifyCSVRecords(int numUsers, String csvContent) {
+        final String[] splits = csvContent.split("[\\n]");
+        Assert.assertEquals(numUsers, splits.length);
+
+        for (int i=0; i < numUsers; i++) {
+            final String line = splits[i];
+            Assert.assertEquals("Bob" + i + "," + i + ",blue" + i, line);
+        }
+    }
+
+    private void writeParquetUsers(final File parquetFile, int numUsers) throws IOException {
+        if (parquetFile.exists()) {
+            Assert.assertTrue(parquetFile.delete());
+        }
+
+        final Path parquetPath = new Path(parquetFile.getPath());
+
+        final AvroParquetWriter.Builder<GenericRecord> writerBuilder = AvroParquetWriter
+                .<GenericRecord>builder(parquetPath)
+                .withSchema(schema)
+                .withConf(testConf);
+
+        try (final ParquetWriter<GenericRecord> writer = writerBuilder.build()) {
+            for (int i=0; i < numUsers; i++) {
+                final GenericRecord user = new GenericData.Record(schema);
+                user.put("name", "Bob" + i);
+                user.put("favorite_number", i);
+                user.put("favorite_color", "blue" + i);
+
+                writer.write(user);
+            }
+        }
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
new file mode 100644
index 0000000..3a07dce
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -0,0 +1,669 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.hadoop.exception.FailureException;
+import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.provenance.ProvenanceEventRecord;
+import org.apache.nifi.provenance.ProvenanceEventType;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.MalformedRecordException;
+import org.apache.nifi.serialization.RecordReader;
+import org.apache.nifi.serialization.RecordReaderFactory;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.RecordSet;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.when;
+
+
+public class PutParquetTest {
+
+    static final String DIRECTORY = "target";
+    static final String TEST_CONF_PATH = "src/test/resources/core-site.xml";
+
+    private Schema schema;
+    private Configuration testConf;
+    private PutParquet proc;
+    private MockRecordParser readerFactory;
+    private TestRunner testRunner;
+
+
+    @Before
+    public void setup() throws IOException, InitializationException {
+        final String avroSchema = IOUtils.toString(new FileInputStream("src/test/resources/avro/user.avsc"), StandardCharsets.UTF_8);
+        schema = new Schema.Parser().parse(avroSchema);
+
+        testConf = new Configuration();
+        testConf.addResource(new Path(TEST_CONF_PATH));
+
+        proc = new PutParquet();
+    }
+
+    private void configure(final PutParquet putParquet, final int numUsers) throws InitializationException {
+        testRunner = TestRunners.newTestRunner(putParquet);
+        testRunner.setProperty(PutParquet.HADOOP_CONFIGURATION_RESOURCES, TEST_CONF_PATH);
+        testRunner.setProperty(PutParquet.DIRECTORY, DIRECTORY);
+
+        readerFactory = new MockRecordParser();
+
+        final RecordSchema recordSchema = AvroTypeUtil.createSchema(schema);
+        for (final RecordField recordField : recordSchema.getFields()) {
+            readerFactory.addSchemaField(recordField.getFieldName(), recordField.getDataType().getFieldType());
+        }
+
+        for (int i=0; i < numUsers; i++) {
+            readerFactory.addRecord("name" + i, i, "blue" + i);
+        }
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+
+        testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
+        testRunner.setProperty(SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
+        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, schema.toString());
+    }
+
+    @Test
+    public void testWriteAvroParquetWithDefaults() throws IOException, InitializationException {
+        configure(proc, 100);
+
+        final String filename = "testWriteAvroWithDefaults-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+
+        final Path avroParquetFile = new Path(DIRECTORY + "/" + filename);
+
+        // verify the successful flow file has the expected attributes
+        final MockFlowFile mockFlowFile = testRunner.getFlowFilesForRelationship(PutParquet.REL_SUCCESS).get(0);
+        mockFlowFile.assertAttributeEquals(PutParquet.ABSOLUTE_HDFS_PATH_ATTRIBUTE, avroParquetFile.getParent().toString());
+        mockFlowFile.assertAttributeEquals(CoreAttributes.FILENAME.key(), filename);
+        mockFlowFile.assertAttributeEquals(PutParquet.RECORD_COUNT_ATTR, "100");
+
+        // verify we generated a provenance event
+        final List<ProvenanceEventRecord> provEvents = testRunner.getProvenanceEvents();
+        Assert.assertEquals(1, provEvents.size());
+
+        // verify it was a SEND event with the correct URI
+        final ProvenanceEventRecord provEvent = provEvents.get(0);
+        Assert.assertEquals(ProvenanceEventType.SEND, provEvent.getEventType());
+        Assert.assertEquals("hdfs://" + avroParquetFile.toString(), provEvent.getTransitUri());
+
+        // verify the content of the parquet file by reading it back in
+        verifyAvroParquetUsers(avroParquetFile, 100);
+
+        // verify we don't have the temp dot file after success
+        final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempAvroParquetFile.exists());
+
+        // verify we DO have the CRC file after success
+        final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc");
+        Assert.assertTrue(crcAvroParquetFile.exists());
+    }
+
+    @Test
+    public void testWriteAvroAndRemoveCRCFiles() throws IOException, InitializationException {
+        configure(proc,100);
+        testRunner.setProperty(PutParquet.REMOVE_CRC_FILES, "true");
+
+        final String filename = "testWriteAvroAndRemoveCRCFiles-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+
+        // verify we don't have the temp dot file after success
+        final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempAvroParquetFile.exists());
+
+        // verify we don't have the CRC file after success because we set remove to true
+        final File crcAvroParquetFile = new File(DIRECTORY + "/." + filename + ".crc");
+        Assert.assertFalse(crcAvroParquetFile.exists());
+    }
+
+    @Test
+    public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException {
+        configure(proc, 100);
+        testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
+
+        final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+
+        // verify the content of the parquet file by reading it back in
+        final Path avroParquetFile = new Path(DIRECTORY + "/" + filename);
+        verifyAvroParquetUsers(avroParquetFile, 100);
+    }
+
+    @Test
+    public void testInvalidAvroShouldRouteToFailure() throws InitializationException, SchemaNotFoundException, MalformedRecordException, IOException {
+        configure(proc, 0);
+
+        // simulate throwing an IOException when the factory creates a reader which is what would happen when
+        // invalid Avro is passed to the Avro reader factory
+        final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenThrow(new IOException("NOT AVRO"));
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
+
+        final String filename = "testInvalidAvroShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testCreateDirectoryIOExceptionShouldRouteToRetry() throws InitializationException, IOException {
+        final PutParquet proc = new PutParquet() {
+            @Override
+            protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup)
+                    throws IOException, FailureException {
+                throw new IOException("IOException creating directory");
+            }
+        };
+
+        configure(proc, 10);
+
+        final String filename = "testCreateDirectoryIOExceptionShouldRouteToRetry-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testCreateDirectoryFailureExceptionShouldRouteToFailure() throws InitializationException, IOException {
+        final PutParquet proc = new PutParquet() {
+            @Override
+            protected void createDirectory(FileSystem fileSystem, Path directory, String remoteOwner, String remoteGroup)
+                    throws IOException, FailureException {
+                throw new FailureException("FailureException creating directory");
+            }
+        };
+
+        configure(proc, 10);
+
+        final String filename = "testCreateDirectoryFailureExceptionShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testDestinationExistsWithoutOverwriteShouldRouteFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.OVERWRITE, "false");
+
+        final String filename = "testDestinationExistsWithoutOverwriteShouldRouteFailure-" + System.currentTimeMillis();
+
+        // create a file in the directory with the same name
+        final File avroParquetFile = new File(DIRECTORY + "/" + filename);
+        Assert.assertTrue(avroParquetFile.createNewFile());
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testTempDestinationExistsWithoutOverwriteShouldRouteFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.OVERWRITE, "false");
+
+        // use the dot filename
+        final String filename = ".testDestinationExistsWithoutOverwriteShouldRouteFailure-" + System.currentTimeMillis();
+
+        // create a file in the directory with the same name
+        final File avroParquetFile = new File(DIRECTORY + "/" + filename);
+        Assert.assertTrue(avroParquetFile.createNewFile());
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testDestinationExistsWithOverwriteShouldBeSuccessful() throws InitializationException, IOException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.OVERWRITE, "true");
+
+        final String filename = "testDestinationExistsWithOverwriteShouldBeSuccessful-" + System.currentTimeMillis();
+
+        // create a file in the directory with the same name
+        final File avroParquetFile = new File(DIRECTORY + "/" + filename);
+        Assert.assertTrue(avroParquetFile.createNewFile());
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testValidSchemaWithELShouldBeSuccessful() throws InitializationException, IOException {
+        configure(proc, 10);
+        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
+
+        final String filename = "testValidSchemaWithELShouldBeSuccessful-" + System.currentTimeMillis();
+
+        // don't provide my.schema as an attribute
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("my.schema", schema.toString());
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testSchemaWithELMissingShouldRouteToFailure() throws InitializationException, IOException {
+        configure(proc, 10);
+        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
+
+        final String filename = "testSchemaWithELMissingShouldRouteToFailure-" + System.currentTimeMillis();
+
+        // don't provide my.schema as an attribute
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testInvalidSchemaShouldRouteToFailure() throws InitializationException, IOException {
+        configure(proc, 10);
+        testRunner.setProperty(SchemaAccessUtils.SCHEMA_TEXT, "${my.schema}");
+
+        final String filename = "testInvalidSchemaShouldRouteToFailure-" + System.currentTimeMillis();
+
+        // don't provide my.schema as an attribute
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("my.schema", "NOT A SCHEMA");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testMalformedRecordExceptionFromReaderShouldRouteToFailure() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
+        configure(proc, 10);
+
+        final RecordReader recordReader = Mockito.mock(RecordReader.class);
+        when(recordReader.nextRecord()).thenThrow(new MalformedRecordException("ERROR"));
+
+        final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
+
+        final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testIOExceptionCreatingWriterShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException {
+        final PutParquet proc = new PutParquet() {
+            @Override
+            public HDFSRecordWriter createHDFSRecordWriter(ProcessContext context, FlowFile flowFile, Configuration conf, Path path, RecordSchema schema)
+                    throws IOException, SchemaNotFoundException {
+                throw new IOException("IOException");
+            }
+        };
+        configure(proc, 0);
+
+        final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testIOExceptionFromReaderShouldRouteToRetry() throws InitializationException, IOException, MalformedRecordException, SchemaNotFoundException {
+        configure(proc, 10);
+
+        final RecordSet recordSet = Mockito.mock(RecordSet.class);
+        when(recordSet.next()).thenThrow(new IOException("ERROR"));
+
+        final RecordReader recordReader = Mockito.mock(RecordReader.class);
+        when(recordReader.createRecordSet()).thenReturn(recordSet);
+
+        final RecordReaderFactory readerFactory = Mockito.mock(RecordReaderFactory.class);
+        when(readerFactory.getIdentifier()).thenReturn("mock-reader-factory");
+        when(readerFactory.createRecordReader(any(FlowFile.class), any(InputStream.class), any(ComponentLog.class))).thenReturn(recordReader);
+
+        testRunner.addControllerService("mock-reader-factory", readerFactory);
+        testRunner.enableControllerService(readerFactory);
+        testRunner.setProperty(PutParquet.RECORD_READER, "mock-reader-factory");
+
+        final String filename = "testMalformedRecordExceptionShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1);
+    }
+
+    @Test
+    public void testIOExceptionRenamingShouldRouteToRetry() throws InitializationException, IOException {
+        final PutParquet proc = new PutParquet() {
+            @Override
+            protected void rename(FileSystem fileSystem, Path srcFile, Path destFile)
+                    throws IOException, InterruptedException, FailureException {
+                throw new IOException("IOException renaming");
+            }
+        };
+
+        configure(proc, 10);
+
+        final String filename = "testIOExceptionRenamingShouldRouteToRetry-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_RETRY, 1);
+
+        // verify we don't have the temp dot file after success
+        final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempAvroParquetFile.exists());
+    }
+
+    @Test
+    public void testFailureExceptionRenamingShouldRouteToFailure() throws InitializationException, IOException {
+        final PutParquet proc = new PutParquet() {
+            @Override
+            protected void rename(FileSystem fileSystem, Path srcFile, Path destFile)
+                    throws IOException, InterruptedException, FailureException {
+                throw new FailureException("FailureException renaming");
+            }
+        };
+
+        configure(proc, 10);
+
+        final String filename = "testFailureExceptionRenamingShouldRouteToFailure-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+
+        // verify we don't have the temp dot file after success
+        final File tempAvroParquetFile = new File(DIRECTORY + "/." + filename);
+        Assert.assertFalse(tempAvroParquetFile.exists());
+    }
+
+    @Test
+    public void testRowGroupSize() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B");
+
+        final String filename = "testRowGroupSize-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}");
+
+        final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("row.group.size", "NOT A DATA SIZE");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testPageSize() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B");
+
+        final String filename = "testPageGroupSize-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}");
+
+        final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("page.size", "NOT A DATA SIZE");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testDictionaryPageSize() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B");
+
+        final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
+
+        final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("dictionary.page.size", "NOT A DATA SIZE");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testMaxPaddingPageSize() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B");
+
+        final String filename = "testMaxPaddingSize-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+    }
+
+    @Test
+    public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
+        configure(proc, 10);
+        testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}");
+
+        final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+        flowFileAttributes.put("max.padding.size", "NOT A DATA SIZE");
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_FAILURE, 1);
+    }
+
+    @Test
+    public void testReadAsStringAndWriteAsInt() throws InitializationException, IOException {
+        configure(proc, 0);
+
+        // add the favorite color as a string
+        readerFactory.addRecord("name0", "0", "blue0");
+
+        final String filename = "testReadAsStringAndWriteAsInt-" + System.currentTimeMillis();
+
+        final Map<String,String> flowFileAttributes = new HashMap<>();
+        flowFileAttributes.put(CoreAttributes.FILENAME.key(), filename);
+
+        testRunner.enqueue("trigger", flowFileAttributes);
+        testRunner.run();
+        testRunner.assertAllFlowFilesTransferred(PutParquet.REL_SUCCESS, 1);
+
+        final Path avroParquetFile = new Path(DIRECTORY + "/" + filename);
+
+        // verify the content of the parquet file by reading it back in
+        verifyAvroParquetUsers(avroParquetFile, 1);
+    }
+
+    private void verifyAvroParquetUsers(final Path avroParquetUsers, final int numExpectedUsers) throws IOException {
+        final ParquetReader.Builder<GenericRecord> readerBuilder = AvroParquetReader
+                .<GenericRecord>builder(avroParquetUsers)
+                .withConf(testConf);
+
+        int currUser = 0;
+
+        try (final ParquetReader<GenericRecord> reader = readerBuilder.build()) {
+            GenericRecord nextRecord;
+            while((nextRecord = reader.read()) != null) {
+                Assert.assertNotNull(nextRecord);
+                Assert.assertEquals("name" + currUser, nextRecord.get("name").toString());
+                Assert.assertEquals(currUser, nextRecord.get("favorite_number"));
+                Assert.assertEquals("blue" + currUser, nextRecord.get("favorite_color").toString());
+                currUser++;
+            }
+        }
+
+        Assert.assertEquals(numExpectedUsers, currUser);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc
new file mode 100644
index 0000000..470827a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/user.avsc
@@ -0,0 +1,9 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]}
+ ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml
new file mode 100644
index 0000000..234ecf2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/core-site.xml
@@ -0,0 +1,25 @@
+<?xml version="1.0"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- Put site-specific property overrides in this file. -->
+
+<configuration>
+    <property>
+        <name>fs.defaultFS</name>
+        <value>file:///</value>
+    </property>
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-parquet-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/pom.xml
new file mode 100644
index 0000000..2b8248c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/pom.xml
@@ -0,0 +1,35 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements. See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License. You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.nifi</groupId>
+        <artifactId>nifi-nar-bundles</artifactId>
+        <version>1.2.0-SNAPSHOT</version>
+    </parent>
+
+    <groupId>org.apache.nifi</groupId>
+    <artifactId>nifi-parquet-bundle</artifactId>
+    <version>1.2.0-SNAPSHOT</version>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>nifi-parquet-processors</module>
+        <module>nifi-parquet-nar</module>
+    </modules>
+
+</project>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
index 3cfae30..a3a1c14 100644
--- a/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
+++ b/nifi-nar-bundles/nifi-registry-bundle/nifi-registry-service/pom.xml
@@ -32,6 +32,10 @@
 			<artifactId>nifi-record-serialization-service-api</artifactId>
 		</dependency>
 		<dependency>
+			<groupId>org.apache.nifi</groupId>
+			<artifactId>nifi-record</artifactId>
+		</dependency>
+		<dependency>
 			<groupId>org.apache.avro</groupId>
 			<artifactId>avro</artifactId>
 		</dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
index fc01a18..af0f8c8 100644
--- a/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-scripting-bundle/nifi-scripting-processors/pom.xml
@@ -43,6 +43,10 @@
             <artifactId>nifi-record-serialization-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
             <groupId>org.codehaus.groovy</groupId>
             <artifactId>groovy-all</artifactId>
         </dependency>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 295ae96..d0a1f1c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -54,6 +54,10 @@
             <artifactId>nifi-record-serialization-service-api</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-record</artifactId>
+        </dependency>
+        <dependency>
             <groupId>commons-io</groupId>
             <artifactId>commons-io</artifactId>
         </dependency>
@@ -225,6 +229,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-mock-record-utils</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.apache.derby</groupId>
             <artifactId>derby</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
index 355f192..c31cdd4 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/TestPutDatabaseRecord.groovy
@@ -18,8 +18,8 @@ package org.apache.nifi.processors.standard
 
 import org.apache.nifi.processor.exception.ProcessException
 import org.apache.nifi.processor.util.pattern.RollbackOnFailure
-import org.apache.nifi.processors.standard.util.record.MockRecordParser
 import org.apache.nifi.reporting.InitializationException
+import org.apache.nifi.serialization.record.MockRecordParser
 import org.apache.nifi.serialization.record.RecordField
 import org.apache.nifi.serialization.record.RecordFieldType
 import org.apache.nifi.serialization.record.RecordSchema

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 0dcaeec..de3a428 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -17,17 +17,17 @@
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertTrue;
-
-import org.apache.nifi.processors.standard.util.record.MockRecordParser;
-import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestConvertRecord {
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
index 32c3635..70d3e87 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestQueryRecord.java
@@ -16,23 +16,15 @@
  */
 package org.apache.nifi.processors.standard;
 
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.nifi.controller.AbstractControllerService;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.processors.standard.util.record.MockRecordParser;
-import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSet;
@@ -42,6 +34,14 @@ import org.apache.nifi.util.TestRunners;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
 public class TestQueryRecord {
 
     static {

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
index 4c3bff4..e2f5005 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
@@ -17,20 +17,20 @@
 
 package org.apache.nifi.processors.standard;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import java.util.List;
-
-import org.apache.nifi.processors.standard.util.record.MockRecordParser;
-import org.apache.nifi.processors.standard.util.record.MockRecordWriter;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.serialization.record.MockRecordParser;
+import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 public class TestSplitRecord {
 
     @Test

http://git-wip-us.apache.org/repos/asf/nifi/blob/60d88b5a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
deleted file mode 100644
index fcf0d10..0000000
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/util/record/MockRecordParser.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.nifi.processors.standard.util.record;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.serialization.MalformedRecordException;
-import org.apache.nifi.serialization.RecordReader;
-import org.apache.nifi.serialization.RecordReaderFactory;
-import org.apache.nifi.serialization.SimpleRecordSchema;
-import org.apache.nifi.serialization.record.MapRecord;
-import org.apache.nifi.serialization.record.Record;
-import org.apache.nifi.serialization.record.RecordField;
-import org.apache.nifi.serialization.record.RecordFieldType;
-import org.apache.nifi.serialization.record.RecordSchema;
-
-public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
-    private final List<Object[]> records = new ArrayList<>();
-    private final List<RecordField> fields = new ArrayList<>();
-    private final int failAfterN;
-
-    public MockRecordParser() {
-        this(-1);
-    }
-
-    public MockRecordParser(final int failAfterN) {
-        this.failAfterN = failAfterN;
-    }
-
-
-    public void addSchemaField(final String fieldName, final RecordFieldType type) {
-        fields.add(new RecordField(fieldName, type.getDataType()));
-    }
-
-    public void addRecord(Object... values) {
-        records.add(values);
-    }
-
-    @Override
-    public RecordReader createRecordReader(FlowFile flowFile, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
-        final Iterator<Object[]> itr = records.iterator();
-
-        return new RecordReader() {
-            private int recordCount = 0;
-
-            @Override
-            public void close() throws IOException {
-            }
-
-            @Override
-            public Record nextRecord() throws IOException, MalformedRecordException {
-                if (failAfterN >= recordCount) {
-                    throw new MalformedRecordException("Intentional Unit Test Exception because " + recordCount + " records have been read");
-                }
-                recordCount++;
-
-                if (!itr.hasNext()) {
-                    return null;
-                }
-
-                final Object[] values = itr.next();
-                final Map<String, Object> valueMap = new HashMap<>();
-                int i = 0;
-                for (final RecordField field : fields) {
-                    final String fieldName = field.getFieldName();
-                    valueMap.put(fieldName, values[i++]);
-                }
-
-                return new MapRecord(new SimpleRecordSchema(fields), valueMap);
-            }
-
-            @Override
-            public RecordSchema getSchema() {
-                return new SimpleRecordSchema(fields);
-            }
-        };
-    }
-}
\ No newline at end of file