You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/23 18:08:34 UTC

nifi git commit: NIFI-5706 Added ConvertAvroToParquet processor - Refactored code : ParquetBuilderProperties merged in ParquetUtils

Repository: nifi
Updated Branches:
  refs/heads/master 02261311b -> e45584d0f


NIFI-5706 Added ConvertAvroToParquet processor
- Refactored code : ParquetBuilderProperties merged in ParquetUtils

This closes #3079.

Signed-off-by: Bryan Bende <bb...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e45584d0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e45584d0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e45584d0

Branch: refs/heads/master
Commit: e45584d0fb8e19f6913fb25a8db3ce8303b73cd0
Parents: 0226131
Author: Mohit Garg <mo...@adobe.com>
Authored: Tue Oct 16 11:40:14 2018 +0530
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Oct 23 14:08:07 2018 -0400

----------------------------------------------------------------------
 .../nifi-parquet-processors/pom.xml             |   1 +
 .../parquet/ConvertAvroToParquet.java           | 204 ++++++++++++++
 .../nifi/processors/parquet/FetchParquet.java   |   1 -
 .../nifi/processors/parquet/PutParquet.java     | 159 +----------
 .../parquet/stream/NifiOutputStream.java        |  66 +++++
 .../parquet/stream/NifiParquetOutputFile.java   |  54 ++++
 .../processors/parquet/utils/ParquetUtils.java  | 202 ++++++++++++++
 .../org.apache.nifi.processor.Processor         |   3 +-
 .../nifi/processors/parquet/PutParquetTest.java |  19 +-
 .../parquet/TestConvertAvroToParquet.java       | 263 +++++++++++++++++++
 .../src/test/resources/avro/all-minus-enum.avsc |  60 +++++
 11 files changed, 871 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index c8aa0ed..d458d27 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -92,6 +92,7 @@
                         <exclude>src/test/resources/avro/user.avsc</exclude>
                         <exclude>src/test/resources/avro/user-with-array.avsc</exclude>
                         <exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
+                        <exclude>src/test/resources/avro/all-minus-enum.avsc</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
new file mode 100644
index 0000000..0bcf606
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetWriter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does "
+        + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can "
+        + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"),
+        @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+    // Attributes
+    public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+    private volatile List<PropertyDescriptor> parquetProps;
+
+    // Relationships
+    static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Parquet file that was converted successfully from Avro")
+            .build();
+
+    static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("Avro content that could not be processed")
+            .build();
+
+    static final Set<Relationship> RELATIONSHIPS
+            = ImmutableSet.<Relationship>builder()
+            .add(SUCCESS)
+            .add(FAILURE)
+            .build();
+
+    @Override
+    protected final void init(final ProcessorInitializationContext context) {
+
+
+        final List<PropertyDescriptor> props = new ArrayList<>();
+
+        props.add(ParquetUtils.COMPRESSION_TYPE);
+        props.add(ParquetUtils.ROW_GROUP_SIZE);
+        props.add(ParquetUtils.PAGE_SIZE);
+        props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+        props.add(ParquetUtils.MAX_PADDING_SIZE);
+        props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+        props.add(ParquetUtils.ENABLE_VALIDATION);
+        props.add(ParquetUtils.WRITER_VERSION);
+
+        this.parquetProps = Collections.unmodifiableList(props);
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return parquetProps;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+        final FlowFile flowFile = session.get();
+        if (flowFile == null) {
+            return;
+        }
+
+        try {
+
+            long startTime = System.currentTimeMillis();
+            final AtomicInteger totalRecordCount = new AtomicInteger(0);
+
+            final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+            FlowFile putFlowFile = flowFile;
+
+            putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
+                try (final InputStream in = new BufferedInputStream(rawIn);
+                     final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) {
+
+                    Schema avroSchema = dataFileReader.getSchema();
+                    getLogger().debug(avroSchema.toString(true));
+                    ParquetWriter<GenericRecord> writer = createParquetWriter(context, flowFile, rawOut, avroSchema );
+
+                    try {
+                        int recordCount = 0;
+                        GenericRecord record = null;
+                        while (dataFileReader.hasNext()) {
+                            record = dataFileReader.next();
+                            writer.write(record);
+                            recordCount++;
+                        }
+                        totalRecordCount.set(recordCount);
+                    } finally {
+                        writer.close();
+                    }
+                }
+            });
+
+            // Add attributes and transfer to success
+            StringBuilder newFilename = new StringBuilder();
+            int extensionIndex = fileName.lastIndexOf(".");
+            if (extensionIndex != -1) {
+                newFilename.append(fileName.substring(0, extensionIndex));
+            } else {
+                newFilename.append(fileName);
+            }
+            newFilename.append(".parquet");
+
+            Map<String,String> outAttributes = new HashMap<>();
+            outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString());
+            outAttributes.put(RECORD_COUNT_ATTRIBUTE,Integer.toString(totalRecordCount.get()) );
+
+            putFlowFile = session.putAllAttributes(putFlowFile, outAttributes);
+            session.transfer(putFlowFile, SUCCESS);
+            session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
+
+        } catch (final ProcessException pe) {
+            getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe});
+            session.transfer(flowFile, FAILURE);
+        }
+
+    }
+
+    private ParquetWriter createParquetWriter(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema)
+            throws IOException {
+
+        NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out);
+
+        final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter
+                .<GenericRecord>builder(nifiParquetOutputFile)
+                .withSchema(schema);
+
+        Configuration conf = new Configuration();
+        conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+        conf.setBoolean("parquet.avro.add-list-element-records", false);
+        conf.setBoolean("parquet.avro.write-old-list-structure", false);
+
+        ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
+
+        return parquetWriter.build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index 9aa0d82..f4a6875 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
 import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
 import org.apache.parquet.avro.AvroParquetReader;
 import org.apache.parquet.hadoop.ParquetReader;
-
 import java.io.IOException;
 
 @SupportsBatching

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 8b8b814..c2794ac 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -32,24 +32,18 @@ import org.apache.nifi.avro.AvroTypeUtil;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
 import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.record.RecordSchema;
 import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -77,64 +71,6 @@ import java.util.List;
 })
 public class PutParquet extends AbstractPutHDFSRecord {
 
-    public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
-            .name("row-group-size")
-            .displayName("Row Group Size")
-            .description("The row group size used by the Parquet writer. " +
-                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
-            .name("page-size")
-            .displayName("Page Size")
-            .description("The page size used by the Parquet writer. " +
-                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
-            .name("dictionary-page-size")
-            .displayName("Dictionary Page Size")
-            .description("The dictionary page size used by the Parquet writer. " +
-                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
-            .name("max-padding-size")
-            .displayName("Max Padding Size")
-            .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
-                    "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
-                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
-            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
-            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
-            .build();
-
-    public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
-            .name("enable-dictionary-encoding")
-            .displayName("Enable Dictionary Encoding")
-            .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
-            .allowableValues("true", "false")
-            .build();
-
-    public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
-            .name("enable-validation")
-            .displayName("Enable Validation")
-            .description("Specifies whether validation should be enabled for the Parquet writer")
-            .allowableValues("true", "false")
-            .build();
-
-    public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
-            .name("writer-version")
-            .displayName("Writer Version")
-            .description("Specifies the version used by Parquet writer")
-            .allowableValues(ParquetProperties.WriterVersion.values())
-            .build();
-
     public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder()
             .name("remove-crc-files")
             .displayName("Remove CRC Files")
@@ -166,13 +102,13 @@ public class PutParquet extends AbstractPutHDFSRecord {
     @Override
     public List<PropertyDescriptor> getAdditionalProperties() {
         final List<PropertyDescriptor> props = new ArrayList<>();
-        props.add(ROW_GROUP_SIZE);
-        props.add(PAGE_SIZE);
-        props.add(DICTIONARY_PAGE_SIZE);
-        props.add(MAX_PADDING_SIZE);
-        props.add(ENABLE_DICTIONARY_ENCODING);
-        props.add(ENABLE_VALIDATION);
-        props.add(WRITER_VERSION);
+        props.add(ParquetUtils.ROW_GROUP_SIZE);
+        props.add(ParquetUtils.PAGE_SIZE);
+        props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+        props.add(ParquetUtils.MAX_PADDING_SIZE);
+        props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+        props.add(ParquetUtils.ENABLE_VALIDATION);
+        props.add(ParquetUtils.WRITER_VERSION);
         props.add(REMOVE_CRC_FILES);
         return Collections.unmodifiableList(props);
     }
@@ -187,88 +123,11 @@ public class PutParquet extends AbstractPutHDFSRecord {
                 .<GenericRecord>builder(path)
                 .withSchema(avroSchema);
 
-        applyCommonConfig(parquetWriter, context, flowFile, conf);
+        ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
 
         return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
     }
 
-    private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
-        builder.withConf(conf);
-
-        // Required properties
-
-        final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
-        final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
-        builder.withWriteMode(mode);
-
-        final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName());
-        final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
-
-        final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
-        builder.withCompressionCodec(codecName);
-
-        // Optional properties
-
-        if (context.getProperty(ROW_GROUP_SIZE).isSet()){
-            try {
-                final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
-                if (rowGroupSize != null) {
-                    builder.withRowGroupSize(rowGroupSize.intValue());
-                }
-            } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
-            }
-        }
-
-        if (context.getProperty(PAGE_SIZE).isSet()) {
-            try {
-                final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
-                if (pageSize != null) {
-                    builder.withPageSize(pageSize.intValue());
-                }
-            } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
-            }
-        }
-
-        if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
-            try {
-                final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
-                if (dictionaryPageSize != null) {
-                    builder.withDictionaryPageSize(dictionaryPageSize.intValue());
-                }
-            } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
-            }
-        }
-
-        if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
-            try {
-                final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
-                if (maxPaddingSize != null) {
-                    builder.withMaxPaddingSize(maxPaddingSize.intValue());
-                }
-            } catch (IllegalArgumentException e) {
-                throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
-            }
-        }
-
-        if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
-            final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
-            builder.withDictionaryEncoding(enableDictionaryEncoding);
-        }
-
-        if (context.getProperty(ENABLE_VALIDATION).isSet()) {
-            final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
-            builder.withValidation(enableValidation);
-        }
-
-        if (context.getProperty(WRITER_VERSION).isSet()) {
-            final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
-            builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
-        }
-    }
-
     @Override
     protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
         final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean();

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
new file mode 100644
index 0000000..acb2dc4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NifiOutputStream extends PositionOutputStream {
+    private long position = 0;
+    private OutputStream outputStream;
+
+    public NifiOutputStream(OutputStream outputStream) {
+        this.outputStream = outputStream;
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return position;
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        position++;
+        outputStream.write(b);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        outputStream.write(b, off, len);
+        position += len;
+    }
+
+    @Override
+    public void flush() throws IOException {
+        outputStream.flush();
+    }
+
+    @Override
+    public void close() throws IOException {
+        outputStream.close();
+    }
+}
+

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
new file mode 100644
index 0000000..d549b7b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.OutputStream;
+
+public class NifiParquetOutputFile implements OutputFile {
+
+      private OutputStream outputStream;
+
+      public NifiParquetOutputFile(OutputStream outputStream) {
+        this.outputStream = outputStream;
+      }
+
+      @Override
+      public PositionOutputStream create(long blockSizeHint) {
+        return new NifiOutputStream(outputStream);
+      }
+
+      @Override
+      public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+        return new NifiOutputStream(outputStream);
+      }
+
+      @Override
+      public boolean supportsBlockSize() {
+        return false;
+      }
+
+      @Override
+      public long defaultBlockSize() {
+        return 0;
+      }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
new file mode 100644
index 0000000..7b116c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.PutParquet;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ParquetUtils {
+
+    public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
+            .name("row-group-size")
+            .displayName("Row Group Size")
+            .description("The row group size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
+            .name("page-size")
+            .displayName("Page Size")
+            .description("The page size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
+            .name("dictionary-page-size")
+            .displayName("Dictionary Page Size")
+            .description("The dictionary page size used by the Parquet writer. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
+            .name("max-padding-size")
+            .displayName("Max Padding Size")
+            .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
+                    "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
+                    "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+            .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+            .build();
+
+    public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
+            .name("enable-dictionary-encoding")
+            .displayName("Enable Dictionary Encoding")
+            .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
+            .name("enable-validation")
+            .displayName("Enable Validation")
+            .description("Specifies whether validation should be enabled for the Parquet writer")
+            .allowableValues("true", "false")
+            .build();
+
+    public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
+            .name("writer-version")
+            .displayName("Writer Version")
+            .description("Specifies the version used by Parquet writer")
+            .allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values())
+            .build();
+
+    public static List<AllowableValue> COMPRESSION_TYPES = getCompressionTypes();
+
+    public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
+            .name("compression-type")
+            .displayName("Compression Type")
+            .description("The type of compression for the file being written.")
+            .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
+            .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+            .required(true)
+            .build();
+
+    public static List<AllowableValue> getCompressionTypes() {
+        final List<AllowableValue> compressionTypes = new ArrayList<>();
+        for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
+            final String name = compressionCodecName.name();
+            compressionTypes.add(new AllowableValue(name, name));
+        }
+        return  Collections.unmodifiableList(compressionTypes);
+    }
+
+    public static void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder,
+                                         final ProcessContext context,
+                                         final FlowFile flowFile,
+                                         final Configuration conf,
+                                         final AbstractProcessor abstractProcessor) {
+        builder.withConf(conf);
+
+        // Required properties
+        boolean overwrite = true;
+        if(context.getProperty(PutParquet.OVERWRITE).isSet())
+            overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean();
+
+        final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+        builder.withWriteMode(mode);
+
+        final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName());
+
+        final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
+
+        final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
+        builder.withCompressionCodec(codecName);
+
+        // Optional properties
+
+        if (context.getProperty(ROW_GROUP_SIZE).isSet()){
+            try {
+                final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (rowGroupSize != null) {
+                    builder.withRowGroupSize(rowGroupSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(PAGE_SIZE).isSet()) {
+            try {
+                final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (pageSize != null) {
+                    builder.withPageSize(pageSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
+            try {
+                final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (dictionaryPageSize != null) {
+                    builder.withDictionaryPageSize(dictionaryPageSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
+            try {
+                final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+                if (maxPaddingSize != null) {
+                    builder.withMaxPaddingSize(maxPaddingSize.intValue());
+                }
+            } catch (IllegalArgumentException e) {
+                throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
+            }
+        }
+
+        if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
+            final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
+            builder.withDictionaryEncoding(enableDictionaryEncoding);
+        }
+
+        if (context.getProperty(ENABLE_VALIDATION).isSet()) {
+            final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
+            builder.withValidation(enableValidation);
+        }
+
+        if (context.getProperty(WRITER_VERSION).isSet()) {
+            final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
+            builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 36583e3..6826d6e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 org.apache.nifi.processors.parquet.PutParquet
-org.apache.nifi.processors.parquet.FetchParquet
\ No newline at end of file
+org.apache.nifi.processors.parquet.FetchParquet
+org.apache.nifi.processors.parquet.ConvertAvroToParquet
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 9e7943e..555dc60 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -42,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processors.hadoop.exception.FailureException;
 import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
 import org.apache.nifi.provenance.ProvenanceEventType;
 import org.apache.nifi.reporting.InitializationException;
@@ -184,7 +185,7 @@ public class PutParquetTest {
     @Test
     public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException {
         configure(proc, 100);
-        testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
+        testRunner.setProperty(ParquetUtils.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
 
         final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis();
 
@@ -472,7 +473,7 @@ public class PutParquetTest {
     @Test
     public void testRowGroupSize() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B");
+        testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B");
 
         final String filename = "testRowGroupSize-" + System.currentTimeMillis();
 
@@ -487,7 +488,7 @@ public class PutParquetTest {
     @Test
     public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}");
+        testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}");
 
         final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
 
@@ -503,7 +504,7 @@ public class PutParquetTest {
     @Test
     public void testPageSize() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B");
+        testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B");
 
         final String filename = "testPageGroupSize-" + System.currentTimeMillis();
 
@@ -518,7 +519,7 @@ public class PutParquetTest {
     @Test
     public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}");
+        testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}");
 
         final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
 
@@ -534,7 +535,7 @@ public class PutParquetTest {
     @Test
     public void testDictionaryPageSize() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B");
+        testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B");
 
         final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis();
 
@@ -549,7 +550,7 @@ public class PutParquetTest {
     @Test
     public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
+        testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
 
         final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
 
@@ -565,7 +566,7 @@ public class PutParquetTest {
     @Test
     public void testMaxPaddingPageSize() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B");
+        testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B");
 
         final String filename = "testMaxPaddingSize-" + System.currentTimeMillis();
 
@@ -580,7 +581,7 @@ public class PutParquetTest {
     @Test
     public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
         configure(proc, 10);
-        testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}");
+        testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}");
 
         final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
new file mode 100644
index 0000000..261de9b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for ConvertAvroToParquet processor
+ */
+public class TestConvertAvroToParquet {
+
+    private ConvertAvroToParquet processor;
+    private TestRunner runner;
+
+    private List<GenericRecord> records = new ArrayList<>();
+    File tmpAvro = new File("target/test.avro");
+    File tmpParquet = new File("target/test.parquet");
+
+    @Before
+    public void setUp() throws Exception {
+        processor = new ConvertAvroToParquet();
+        runner = TestRunners.newTestRunner(processor);
+
+        Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream());
+
+        DataFileWriter<Object> awriter = new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+        GenericData.Record nestedRecord = new GenericRecordBuilder(
+                schema.getField("mynestedrecord").schema())
+                .set("mynestedint", 1).build();
+
+        GenericData.Record record = new GenericRecordBuilder(schema)
+                .set("mynull", null)
+                .set("myboolean", true)
+                .set("myint", 1)
+                .set("mylong", 2L)
+                .set("myfloat", 3.1f)
+                .set("mydouble", 4.1)
+                .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+                .set("mystring", "hello")
+                .set("mynestedrecord", nestedRecord)
+                .set("myarray", new GenericData.Array<Integer>(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2)))
+                .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+                .set("myfixed", new GenericData.Fixed(Schema.createFixed("ignored", null, null, 1), new byte[] { (byte) 65 }))
+                .build();
+
+        awriter.create(schema, tmpAvro);
+        awriter.append(record);
+        awriter.flush();
+        awriter.close();
+
+        DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
+        DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(tmpAvro, datumReader);
+        GenericRecord record1 = null;
+        while (dataFileReader.hasNext()) {
+            record1 = dataFileReader.next(record1);
+            records.add(record1);
+        }
+
+    }
+
+    @Test
+    public void test_Processor() throws Exception {
+
+        FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int readedBytes;
+        byte[] buf = new byte[1024];
+        while ((readedBytes = fileInputStream.read(buf)) > 0) {
+            out.write(buf, 0, readedBytes);
+        }
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToParquet.SUCCESS, 1);
+
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+        // assert meta data
+        assertEquals("1", resultFlowFile.getAttribute(ConvertAvroToParquet.RECORD_COUNT_ATTRIBUTE));
+        assertEquals("test.parquet", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+
+    }
+
+    @Test
+    public void test_Meta_Info() throws Exception {
+
+        FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int readedBytes;
+        byte[] buf = new byte[1024];
+        while ((readedBytes = fileInputStream.read(buf)) > 0) {
+            out.write(buf, 0, readedBytes);
+        }
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+        // Save the flowfile
+        byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+        FileOutputStream fos = new FileOutputStream(tmpParquet);
+        fos.write(resultContents);
+        fos.flush();
+        fos.close();
+
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.getLocal(conf);
+        ParquetMetadata metaData;
+        metaData = ParquetFileReader.readFooter(conf, new Path(tmpParquet.getAbsolutePath()), NO_FILTER);
+
+        // #number of records
+        long nParquetRecords = 0;
+        for(BlockMetaData meta : metaData.getBlocks()){
+            nParquetRecords += meta.getRowCount();
+        }
+        long nAvroRecord = records.size();
+
+        assertEquals(nParquetRecords, nAvroRecord);
+    }
+
+    @Test
+    public void test_Data() throws Exception {
+
+
+        FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        int readedBytes;
+        byte[] buf = new byte[1024];
+        while ((readedBytes = fileInputStream.read(buf)) > 0) {
+            out.write(buf, 0, readedBytes);
+        }
+        out.close();
+
+        Map<String, String> attributes = new HashMap<String, String>() {{
+            put(CoreAttributes.FILENAME.key(), "test.avro");
+        }};
+        runner.enqueue(out.toByteArray(), attributes);
+        runner.run();
+        MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+        // Save the flowfile
+        byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+        FileOutputStream fos = new FileOutputStream(tmpParquet);
+        fos.write(resultContents);
+        fos.flush();
+        fos.close();
+
+        Configuration conf = new Configuration();
+        FileSystem fs = FileSystem.getLocal(conf);
+        ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(tmpParquet.getAbsolutePath()))
+                        .withConf(conf)
+                        .build();
+
+        List<Group> parquetRecords = new ArrayList<Group>();
+
+        Group current;
+        current = reader.read();
+        while (current != null) {
+            assertTrue(current instanceof Group);
+            parquetRecords.add(current);
+            current = reader.read();
+        }
+
+        Group firstRecord = parquetRecords.get(0);
+
+        // Primitive
+        assertEquals(firstRecord.getInteger("myint", 0), 1);
+        assertEquals(firstRecord.getLong("mylong", 0), 2);
+        assertEquals(firstRecord.getBoolean("myboolean", 0), true);
+        assertEquals(firstRecord.getFloat("myfloat", 0), 3.1, 0.0001);
+        assertEquals(firstRecord.getDouble("mydouble", 0), 4.1, 0.001);
+        assertEquals(firstRecord.getString("mybytes", 0), "hello");
+        assertEquals(firstRecord.getString("mystring", 0), "hello");
+
+        // Nested
+        assertEquals(firstRecord.getGroup("mynestedrecord",0).getInteger("mynestedint",0), 1);
+
+        // Array
+        assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",0).getInteger("element", 0), 1);
+        assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element", 0), 2);
+
+        // Map
+        assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value", 0), 1);
+        assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value", 0), 2);
+
+        // Fixed
+        assertEquals(firstRecord.getString("myfixed",0), "A");
+
+    }
+
+    @After
+    public void cleanup(){
+        tmpAvro.delete();
+        tmpParquet.delete();
+
+    }
+
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
new file mode 100644
index 0000000..3a383a0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
@@ -0,0 +1,60 @@
+{
+	"name": "myrecord",
+	"namespace": "parquet.avro",
+	"type": "record",
+	"fields": [{
+		"name": "mynull",
+		"type": "null"
+	}, {
+		"name": "myboolean",
+		"type": "boolean"
+	}, {
+		"name": "myint",
+		"type": "int"
+	}, {
+		"name": "mylong",
+		"type": "long"
+	}, {
+		"name": "myfloat",
+		"type": "float"
+	}, {
+		"name": "mydouble",
+		"type": "double"
+	}, {
+		"name": "mybytes",
+		"type": "bytes"
+	}, {
+		"name": "mystring",
+		"type": "string"
+	}, {
+		"name": "mynestedrecord",
+		"type": {
+			"type": "record",
+			"name": "ignored1",
+			"fields": [{
+				"name": "mynestedint",
+				"type": "int"
+			}]
+		}
+	}, {
+		"name": "myarray",
+		"type": {
+			"type": "array",
+			"items": "int"
+		}
+	}, {
+		"name": "mymap",
+		"type": {
+			"type": "map",
+			"values": "int"
+		}
+	}, {
+		"name": "myfixed",
+		"type": {
+			"type": "fixed",
+			"name": "ignored3",
+			"namespace": "",
+			"size": 1
+		}
+	}]
+}
\ No newline at end of file