You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by bb...@apache.org on 2018/10/23 18:08:34 UTC
nifi git commit: NIFI-5706 Added ConvertAvroToParquet processor -
Refactored code : ParquetBuilderProperties merged in ParquetUtils
Repository: nifi
Updated Branches:
refs/heads/master 02261311b -> e45584d0f
NIFI-5706 Added ConvertAvroToParquet processor
- Refactored code : ParquetBuilderProperties merged in ParquetUtils
This closes #3079.
Signed-off-by: Bryan Bende <bb...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e45584d0
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e45584d0
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e45584d0
Branch: refs/heads/master
Commit: e45584d0fb8e19f6913fb25a8db3ce8303b73cd0
Parents: 0226131
Author: Mohit Garg <mo...@adobe.com>
Authored: Tue Oct 16 11:40:14 2018 +0530
Committer: Bryan Bende <bb...@apache.org>
Committed: Tue Oct 23 14:08:07 2018 -0400
----------------------------------------------------------------------
.../nifi-parquet-processors/pom.xml | 1 +
.../parquet/ConvertAvroToParquet.java | 204 ++++++++++++++
.../nifi/processors/parquet/FetchParquet.java | 1 -
.../nifi/processors/parquet/PutParquet.java | 159 +----------
.../parquet/stream/NifiOutputStream.java | 66 +++++
.../parquet/stream/NifiParquetOutputFile.java | 54 ++++
.../processors/parquet/utils/ParquetUtils.java | 202 ++++++++++++++
.../org.apache.nifi.processor.Processor | 3 +-
.../nifi/processors/parquet/PutParquetTest.java | 19 +-
.../parquet/TestConvertAvroToParquet.java | 263 +++++++++++++++++++
.../src/test/resources/avro/all-minus-enum.avsc | 60 +++++
11 files changed, 871 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
index c8aa0ed..d458d27 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/pom.xml
@@ -92,6 +92,7 @@
<exclude>src/test/resources/avro/user.avsc</exclude>
<exclude>src/test/resources/avro/user-with-array.avsc</exclude>
<exclude>src/test/resources/avro/user-with-nullable-array.avsc</exclude>
+ <exclude>src/test/resources/avro/all-minus-enum.avsc</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
new file mode 100644
index 0000000..0bcf606
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/ConvertAvroToParquet.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessorInitializationContext;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processors.parquet.stream.NifiParquetOutputFile;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
+import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.hadoop.ParquetWriter;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.BufferedInputStream;
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Set;
+import java.util.Collections;
+import java.util.concurrent.atomic.AtomicInteger;
+
+@Tags({"avro", "parquet", "convert"})
+@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Converts Avro records into Parquet file format. The incoming FlowFile should be a valid avro file. If an incoming FlowFile does "
+ + "not contain any records, an empty parquet file is the output. NOTE: Many Avro datatypes (collections, primitives, and unions of primitives, e.g.) can "
+ + "be converted to parquet, but unions of collections and other complex datatypes may not be able to be converted to Parquet.")
+@WritesAttributes({
+ @WritesAttribute(attribute = "filename", description = "Sets the filename to the existing filename with the extension replaced by / added to by .parquet"),
+ @WritesAttribute(attribute = "record.count", description = "Sets the number of records in the parquet file.")
+})
+public class ConvertAvroToParquet extends AbstractProcessor {
+
+ // Attributes
+ public static final String RECORD_COUNT_ATTRIBUTE = "record.count";
+
+ private volatile List<PropertyDescriptor> parquetProps;
+
+ // Relationships
+ static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Parquet file that was converted successfully from Avro")
+ .build();
+
+ static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure")
+ .description("Avro content that could not be processed")
+ .build();
+
+ static final Set<Relationship> RELATIONSHIPS
+ = ImmutableSet.<Relationship>builder()
+ .add(SUCCESS)
+ .add(FAILURE)
+ .build();
+
+ @Override
+ protected final void init(final ProcessorInitializationContext context) {
+
+
+ final List<PropertyDescriptor> props = new ArrayList<>();
+
+ props.add(ParquetUtils.COMPRESSION_TYPE);
+ props.add(ParquetUtils.ROW_GROUP_SIZE);
+ props.add(ParquetUtils.PAGE_SIZE);
+ props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+ props.add(ParquetUtils.MAX_PADDING_SIZE);
+ props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+ props.add(ParquetUtils.ENABLE_VALIDATION);
+ props.add(ParquetUtils.WRITER_VERSION);
+
+ this.parquetProps = Collections.unmodifiableList(props);
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return parquetProps;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final FlowFile flowFile = session.get();
+ if (flowFile == null) {
+ return;
+ }
+
+ try {
+
+ long startTime = System.currentTimeMillis();
+ final AtomicInteger totalRecordCount = new AtomicInteger(0);
+
+ final String fileName = flowFile.getAttribute(CoreAttributes.FILENAME.key());
+
+ FlowFile putFlowFile = flowFile;
+
+ putFlowFile = session.write(flowFile, (rawIn, rawOut) -> {
+ try (final InputStream in = new BufferedInputStream(rawIn);
+ final DataFileStream<GenericRecord> dataFileReader = new DataFileStream<>(in, new GenericDatumReader<>())) {
+
+ Schema avroSchema = dataFileReader.getSchema();
+ getLogger().debug(avroSchema.toString(true));
+ ParquetWriter<GenericRecord> writer = createParquetWriter(context, flowFile, rawOut, avroSchema );
+
+ try {
+ int recordCount = 0;
+ GenericRecord record = null;
+ while (dataFileReader.hasNext()) {
+ record = dataFileReader.next();
+ writer.write(record);
+ recordCount++;
+ }
+ totalRecordCount.set(recordCount);
+ } finally {
+ writer.close();
+ }
+ }
+ });
+
+ // Add attributes and transfer to success
+ StringBuilder newFilename = new StringBuilder();
+ int extensionIndex = fileName.lastIndexOf(".");
+ if (extensionIndex != -1) {
+ newFilename.append(fileName.substring(0, extensionIndex));
+ } else {
+ newFilename.append(fileName);
+ }
+ newFilename.append(".parquet");
+
+ Map<String,String> outAttributes = new HashMap<>();
+ outAttributes.put(CoreAttributes.FILENAME.key(), newFilename.toString());
+ outAttributes.put(RECORD_COUNT_ATTRIBUTE,Integer.toString(totalRecordCount.get()) );
+
+ putFlowFile = session.putAllAttributes(putFlowFile, outAttributes);
+ session.transfer(putFlowFile, SUCCESS);
+ session.getProvenanceReporter().modifyContent(putFlowFile, "Converted "+totalRecordCount.get()+" records", System.currentTimeMillis() - startTime);
+
+ } catch (final ProcessException pe) {
+ getLogger().error("Failed to convert {} from Avro to Parquet due to {}; transferring to failure", new Object[]{flowFile, pe});
+ session.transfer(flowFile, FAILURE);
+ }
+
+ }
+
+ private ParquetWriter createParquetWriter(final ProcessContext context, final FlowFile flowFile, final OutputStream out, final Schema schema)
+ throws IOException {
+
+ NifiParquetOutputFile nifiParquetOutputFile = new NifiParquetOutputFile(out);
+
+ final AvroParquetWriter.Builder<GenericRecord> parquetWriter = AvroParquetWriter
+ .<GenericRecord>builder(nifiParquetOutputFile)
+ .withSchema(schema);
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(AvroReadSupport.AVRO_COMPATIBILITY, true);
+ conf.setBoolean("parquet.avro.add-list-element-records", false);
+ conf.setBoolean("parquet.avro.write-old-list-structure", false);
+
+ ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
+
+ return parquetWriter.build();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
index 9aa0d82..f4a6875 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/FetchParquet.java
@@ -36,7 +36,6 @@ import org.apache.nifi.processors.hadoop.record.HDFSRecordReader;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordReader;
import org.apache.parquet.avro.AvroParquetReader;
import org.apache.parquet.hadoop.ParquetReader;
-
import java.io.IOException;
@SupportsBatching
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
index 8b8b814..c2794ac 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/PutParquet.java
@@ -32,24 +32,18 @@ import org.apache.nifi.avro.AvroTypeUtil;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
-import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.hadoop.AbstractPutHDFSRecord;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
import org.apache.nifi.processors.parquet.record.AvroParquetHDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.record.RecordSchema;
import org.apache.parquet.avro.AvroParquetWriter;
-import org.apache.parquet.column.ParquetProperties;
-import org.apache.parquet.hadoop.ParquetFileWriter;
-import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -77,64 +71,6 @@ import java.util.List;
})
public class PutParquet extends AbstractPutHDFSRecord {
- public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
- .name("row-group-size")
- .displayName("Row Group Size")
- .description("The row group size used by the Parquet writer. " +
- "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
- .name("page-size")
- .displayName("Page Size")
- .description("The page size used by the Parquet writer. " +
- "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
- .name("dictionary-page-size")
- .displayName("Dictionary Page Size")
- .description("The dictionary page size used by the Parquet writer. " +
- "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
- .name("max-padding-size")
- .displayName("Max Padding Size")
- .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
- "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
- "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
- .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
- .build();
-
- public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
- .name("enable-dictionary-encoding")
- .displayName("Enable Dictionary Encoding")
- .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
- .allowableValues("true", "false")
- .build();
-
- public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
- .name("enable-validation")
- .displayName("Enable Validation")
- .description("Specifies whether validation should be enabled for the Parquet writer")
- .allowableValues("true", "false")
- .build();
-
- public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
- .name("writer-version")
- .displayName("Writer Version")
- .description("Specifies the version used by Parquet writer")
- .allowableValues(ParquetProperties.WriterVersion.values())
- .build();
-
public static final PropertyDescriptor REMOVE_CRC_FILES = new PropertyDescriptor.Builder()
.name("remove-crc-files")
.displayName("Remove CRC Files")
@@ -166,13 +102,13 @@ public class PutParquet extends AbstractPutHDFSRecord {
@Override
public List<PropertyDescriptor> getAdditionalProperties() {
final List<PropertyDescriptor> props = new ArrayList<>();
- props.add(ROW_GROUP_SIZE);
- props.add(PAGE_SIZE);
- props.add(DICTIONARY_PAGE_SIZE);
- props.add(MAX_PADDING_SIZE);
- props.add(ENABLE_DICTIONARY_ENCODING);
- props.add(ENABLE_VALIDATION);
- props.add(WRITER_VERSION);
+ props.add(ParquetUtils.ROW_GROUP_SIZE);
+ props.add(ParquetUtils.PAGE_SIZE);
+ props.add(ParquetUtils.DICTIONARY_PAGE_SIZE);
+ props.add(ParquetUtils.MAX_PADDING_SIZE);
+ props.add(ParquetUtils.ENABLE_DICTIONARY_ENCODING);
+ props.add(ParquetUtils.ENABLE_VALIDATION);
+ props.add(ParquetUtils.WRITER_VERSION);
props.add(REMOVE_CRC_FILES);
return Collections.unmodifiableList(props);
}
@@ -187,88 +123,11 @@ public class PutParquet extends AbstractPutHDFSRecord {
.<GenericRecord>builder(path)
.withSchema(avroSchema);
- applyCommonConfig(parquetWriter, context, flowFile, conf);
+ ParquetUtils.applyCommonConfig(parquetWriter, context, flowFile, conf, this);
return new AvroParquetHDFSRecordWriter(parquetWriter.build(), avroSchema);
}
- private void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder, final ProcessContext context, final FlowFile flowFile, final Configuration conf) {
- builder.withConf(conf);
-
- // Required properties
-
- final boolean overwrite = context.getProperty(OVERWRITE).asBoolean();
- final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
- builder.withWriteMode(mode);
-
- final PropertyDescriptor compressionTypeDescriptor = getPropertyDescriptor(COMPRESSION_TYPE.getName());
- final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
-
- final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
- builder.withCompressionCodec(codecName);
-
- // Optional properties
-
- if (context.getProperty(ROW_GROUP_SIZE).isSet()){
- try {
- final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (rowGroupSize != null) {
- builder.withRowGroupSize(rowGroupSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(PAGE_SIZE).isSet()) {
- try {
- final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (pageSize != null) {
- builder.withPageSize(pageSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
- try {
- final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (dictionaryPageSize != null) {
- builder.withDictionaryPageSize(dictionaryPageSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
- try {
- final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
- if (maxPaddingSize != null) {
- builder.withMaxPaddingSize(maxPaddingSize.intValue());
- }
- } catch (IllegalArgumentException e) {
- throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
- }
- }
-
- if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
- final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
- builder.withDictionaryEncoding(enableDictionaryEncoding);
- }
-
- if (context.getProperty(ENABLE_VALIDATION).isSet()) {
- final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
- builder.withValidation(enableValidation);
- }
-
- if (context.getProperty(WRITER_VERSION).isSet()) {
- final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
- builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
- }
- }
-
@Override
protected FlowFile postProcess(final ProcessContext context, final ProcessSession session, final FlowFile flowFile, final Path destFile) {
final boolean removeCRCFiles = context.getProperty(REMOVE_CRC_FILES).asBoolean();
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
new file mode 100644
index 0000000..acb2dc4
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiOutputStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+public class NifiOutputStream extends PositionOutputStream {
+ private long position = 0;
+ private OutputStream outputStream;
+
+ public NifiOutputStream(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return position;
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ position++;
+ outputStream.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ write(b, 0, b.length);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ outputStream.write(b, off, len);
+ position += len;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ outputStream.flush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ outputStream.close();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
new file mode 100644
index 0000000..d549b7b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/stream/NifiParquetOutputFile.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.stream;
+
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import java.io.OutputStream;
+
+public class NifiParquetOutputFile implements OutputFile {
+
+ private OutputStream outputStream;
+
+ public NifiParquetOutputFile(OutputStream outputStream) {
+ this.outputStream = outputStream;
+ }
+
+ @Override
+ public PositionOutputStream create(long blockSizeHint) {
+ return new NifiOutputStream(outputStream);
+ }
+
+ @Override
+ public PositionOutputStream createOrOverwrite(long blockSizeHint) {
+ return new NifiOutputStream(outputStream);
+ }
+
+ @Override
+ public boolean supportsBlockSize() {
+ return false;
+ }
+
+ @Override
+ public long defaultBlockSize() {
+ return 0;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
new file mode 100644
index 0000000..7b116c2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/java/org/apache/nifi/processors/parquet/utils/ParquetUtils.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.parquet.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.parquet.PutParquet;
+import org.apache.parquet.column.ParquetProperties;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class ParquetUtils {
+
+ public static final PropertyDescriptor ROW_GROUP_SIZE = new PropertyDescriptor.Builder()
+ .name("row-group-size")
+ .displayName("Row Group Size")
+ .description("The row group size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor PAGE_SIZE = new PropertyDescriptor.Builder()
+ .name("page-size")
+ .displayName("Page Size")
+ .description("The page size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor DICTIONARY_PAGE_SIZE = new PropertyDescriptor.Builder()
+ .name("dictionary-page-size")
+ .displayName("Dictionary Page Size")
+ .description("The dictionary page size used by the Parquet writer. " +
+ "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor MAX_PADDING_SIZE = new PropertyDescriptor.Builder()
+ .name("max-padding-size")
+ .displayName("Max Padding Size")
+ .description("The maximum amount of padding that will be used to align row groups with blocks in the " +
+ "underlying filesystem. If the underlying filesystem is not a block filesystem like HDFS, this has no effect. " +
+ "The value is specified in the format of <Data Size> <Data Unit> where Data Unit is one of B, KB, MB, GB, TB.")
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
+ .build();
+
+ public static final PropertyDescriptor ENABLE_DICTIONARY_ENCODING = new PropertyDescriptor.Builder()
+ .name("enable-dictionary-encoding")
+ .displayName("Enable Dictionary Encoding")
+ .description("Specifies whether dictionary encoding should be enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor ENABLE_VALIDATION = new PropertyDescriptor.Builder()
+ .name("enable-validation")
+ .displayName("Enable Validation")
+ .description("Specifies whether validation should be enabled for the Parquet writer")
+ .allowableValues("true", "false")
+ .build();
+
+ public static final PropertyDescriptor WRITER_VERSION = new PropertyDescriptor.Builder()
+ .name("writer-version")
+ .displayName("Writer Version")
+ .description("Specifies the version used by Parquet writer")
+ .allowableValues(org.apache.parquet.column.ParquetProperties.WriterVersion.values())
+ .build();
+
+ public static List<AllowableValue> COMPRESSION_TYPES = getCompressionTypes();
+
+ public static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
+ .name("compression-type")
+ .displayName("Compression Type")
+ .description("The type of compression for the file being written.")
+ .allowableValues(COMPRESSION_TYPES.toArray(new AllowableValue[0]))
+ .defaultValue(COMPRESSION_TYPES.get(0).getValue())
+ .required(true)
+ .build();
+
+ public static List<AllowableValue> getCompressionTypes() {
+ final List<AllowableValue> compressionTypes = new ArrayList<>();
+ for (CompressionCodecName compressionCodecName : CompressionCodecName.values()) {
+ final String name = compressionCodecName.name();
+ compressionTypes.add(new AllowableValue(name, name));
+ }
+ return Collections.unmodifiableList(compressionTypes);
+ }
+
+ public static void applyCommonConfig(final ParquetWriter.Builder<?, ?> builder,
+ final ProcessContext context,
+ final FlowFile flowFile,
+ final Configuration conf,
+ final AbstractProcessor abstractProcessor) {
+ builder.withConf(conf);
+
+ // Required properties
+ boolean overwrite = true;
+ if(context.getProperty(PutParquet.OVERWRITE).isSet())
+ overwrite = context.getProperty(PutParquet.OVERWRITE).asBoolean();
+
+ final ParquetFileWriter.Mode mode = overwrite ? ParquetFileWriter.Mode.OVERWRITE : ParquetFileWriter.Mode.CREATE;
+ builder.withWriteMode(mode);
+
+ final PropertyDescriptor compressionTypeDescriptor = abstractProcessor.getPropertyDescriptor(COMPRESSION_TYPE.getName());
+
+ final String compressionTypeValue = context.getProperty(compressionTypeDescriptor).getValue();
+
+ final CompressionCodecName codecName = CompressionCodecName.valueOf(compressionTypeValue);
+ builder.withCompressionCodec(codecName);
+
+ // Optional properties
+
+ if (context.getProperty(ROW_GROUP_SIZE).isSet()){
+ try {
+ final Double rowGroupSize = context.getProperty(ROW_GROUP_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (rowGroupSize != null) {
+ builder.withRowGroupSize(rowGroupSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + ROW_GROUP_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(PAGE_SIZE).isSet()) {
+ try {
+ final Double pageSize = context.getProperty(PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (pageSize != null) {
+ builder.withPageSize(pageSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + PAGE_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(DICTIONARY_PAGE_SIZE).isSet()) {
+ try {
+ final Double dictionaryPageSize = context.getProperty(DICTIONARY_PAGE_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (dictionaryPageSize != null) {
+ builder.withDictionaryPageSize(dictionaryPageSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + DICTIONARY_PAGE_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(MAX_PADDING_SIZE).isSet()) {
+ try {
+ final Double maxPaddingSize = context.getProperty(MAX_PADDING_SIZE).evaluateAttributeExpressions(flowFile).asDataSize(DataUnit.B);
+ if (maxPaddingSize != null) {
+ builder.withMaxPaddingSize(maxPaddingSize.intValue());
+ }
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Invalid data size for " + MAX_PADDING_SIZE.getDisplayName(), e);
+ }
+ }
+
+ if (context.getProperty(ENABLE_DICTIONARY_ENCODING).isSet()) {
+ final boolean enableDictionaryEncoding = context.getProperty(ENABLE_DICTIONARY_ENCODING).asBoolean();
+ builder.withDictionaryEncoding(enableDictionaryEncoding);
+ }
+
+ if (context.getProperty(ENABLE_VALIDATION).isSet()) {
+ final boolean enableValidation = context.getProperty(ENABLE_VALIDATION).asBoolean();
+ builder.withValidation(enableValidation);
+ }
+
+ if (context.getProperty(WRITER_VERSION).isSet()) {
+ final String writerVersionValue = context.getProperty(WRITER_VERSION).getValue();
+ builder.withWriterVersion(ParquetProperties.WriterVersion.valueOf(writerVersionValue));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 36583e3..6826d6e 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
org.apache.nifi.processors.parquet.PutParquet
-org.apache.nifi.processors.parquet.FetchParquet
\ No newline at end of file
+org.apache.nifi.processors.parquet.FetchParquet
+org.apache.nifi.processors.parquet.ConvertAvroToParquet
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
index 9e7943e..555dc60 100644
--- a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/PutParquetTest.java
@@ -42,6 +42,7 @@ import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processors.hadoop.exception.FailureException;
import org.apache.nifi.processors.hadoop.record.HDFSRecordWriter;
+import org.apache.nifi.processors.parquet.utils.ParquetUtils;
import org.apache.nifi.provenance.ProvenanceEventRecord;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.reporting.InitializationException;
@@ -184,7 +185,7 @@ public class PutParquetTest {
@Test
public void testWriteAvroWithGZIPCompression() throws IOException, InitializationException {
configure(proc, 100);
- testRunner.setProperty(PutParquet.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
+ testRunner.setProperty(ParquetUtils.COMPRESSION_TYPE, CompressionCodecName.GZIP.name());
final String filename = "testWriteAvroWithGZIPCompression-" + System.currentTimeMillis();
@@ -472,7 +473,7 @@ public class PutParquetTest {
@Test
public void testRowGroupSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "1024 B");
final String filename = "testRowGroupSize-" + System.currentTimeMillis();
@@ -487,7 +488,7 @@ public class PutParquetTest {
@Test
public void testInvalidRowGroupSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.ROW_GROUP_SIZE, "${row.group.size}");
+ testRunner.setProperty(ParquetUtils.ROW_GROUP_SIZE, "${row.group.size}");
final String filename = "testInvalidRowGroupSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -503,7 +504,7 @@ public class PutParquetTest {
@Test
public void testPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.PAGE_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.PAGE_SIZE, "1024 B");
final String filename = "testPageGroupSize-" + System.currentTimeMillis();
@@ -518,7 +519,7 @@ public class PutParquetTest {
@Test
public void testInvalidPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.PAGE_SIZE, "${page.size}");
+ testRunner.setProperty(ParquetUtils.PAGE_SIZE, "${page.size}");
final String filename = "testInvalidPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -534,7 +535,7 @@ public class PutParquetTest {
@Test
public void testDictionaryPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "1024 B");
final String filename = "testDictionaryPageGroupSize-" + System.currentTimeMillis();
@@ -549,7 +550,7 @@ public class PutParquetTest {
@Test
public void testInvalidDictionaryPageSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
+ testRunner.setProperty(ParquetUtils.DICTIONARY_PAGE_SIZE, "${dictionary.page.size}");
final String filename = "testInvalidDictionaryPageSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
@@ -565,7 +566,7 @@ public class PutParquetTest {
@Test
public void testMaxPaddingPageSize() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "1024 B");
+ testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "1024 B");
final String filename = "testMaxPaddingSize-" + System.currentTimeMillis();
@@ -580,7 +581,7 @@ public class PutParquetTest {
@Test
public void testInvalidMaxPaddingSizeFromELShouldRouteToFailure() throws IOException, InitializationException {
configure(proc, 10);
- testRunner.setProperty(PutParquet.MAX_PADDING_SIZE, "${max.padding.size}");
+ testRunner.setProperty(ParquetUtils.MAX_PADDING_SIZE, "${max.padding.size}");
final String filename = "testInvalidMaxPaddingSizeFromELShouldRouteToFailure" + System.currentTimeMillis();
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
new file mode 100644
index 0000000..261de9b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/java/org/apache/nifi/processors/parquet/TestConvertAvroToParquet.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.parquet;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.io.Resources;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.io.DatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.example.GroupReadSupport;
+import org.apache.parquet.hadoop.metadata.BlockMetaData;
+import org.apache.parquet.hadoop.metadata.ParquetMetadata;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import static org.apache.parquet.format.converter.ParquetMetadataConverter.NO_FILTER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for ConvertAvroToParquet processor
+ */
+public class TestConvertAvroToParquet {
+
+ private ConvertAvroToParquet processor;
+ private TestRunner runner;
+
+ private List<GenericRecord> records = new ArrayList<>();
+ File tmpAvro = new File("target/test.avro");
+ File tmpParquet = new File("target/test.parquet");
+
+ @Before
+ public void setUp() throws Exception {
+ processor = new ConvertAvroToParquet();
+ runner = TestRunners.newTestRunner(processor);
+
+ Schema schema = new Schema.Parser().parse(Resources.getResource("avro/all-minus-enum.avsc").openStream());
+
+ DataFileWriter<Object> awriter = new DataFileWriter<Object>(new GenericDatumWriter<Object>());
+ GenericData.Record nestedRecord = new GenericRecordBuilder(
+ schema.getField("mynestedrecord").schema())
+ .set("mynestedint", 1).build();
+
+ GenericData.Record record = new GenericRecordBuilder(schema)
+ .set("mynull", null)
+ .set("myboolean", true)
+ .set("myint", 1)
+ .set("mylong", 2L)
+ .set("myfloat", 3.1f)
+ .set("mydouble", 4.1)
+ .set("mybytes", ByteBuffer.wrap("hello".getBytes(Charsets.UTF_8)))
+ .set("mystring", "hello")
+ .set("mynestedrecord", nestedRecord)
+ .set("myarray", new GenericData.Array<Integer>(Schema.createArray(Schema.create(Schema.Type.INT)), Arrays.asList(1, 2)))
+ .set("mymap", ImmutableMap.of("a", 1, "b", 2))
+ .set("myfixed", new GenericData.Fixed(Schema.createFixed("ignored", null, null, 1), new byte[] { (byte) 65 }))
+ .build();
+
+ awriter.create(schema, tmpAvro);
+ awriter.append(record);
+ awriter.flush();
+ awriter.close();
+
+ DatumReader<GenericRecord> datumReader = new GenericDatumReader<GenericRecord>(schema);
+ DataFileReader<GenericRecord> dataFileReader = new DataFileReader<GenericRecord>(tmpAvro, datumReader);
+ GenericRecord record1 = null;
+ while (dataFileReader.hasNext()) {
+ record1 = dataFileReader.next(record1);
+ records.add(record1);
+ }
+
+ }
+
+ @Test
+ public void test_Processor() throws Exception {
+
+ FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int readedBytes;
+ byte[] buf = new byte[1024];
+ while ((readedBytes = fileInputStream.read(buf)) > 0) {
+ out.write(buf, 0, readedBytes);
+ }
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToParquet.SUCCESS, 1);
+
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+ // assert meta data
+ assertEquals("1", resultFlowFile.getAttribute(ConvertAvroToParquet.RECORD_COUNT_ATTRIBUTE));
+ assertEquals("test.parquet", resultFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
+
+
+ }
+
+ @Test
+ public void test_Meta_Info() throws Exception {
+
+ FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int readedBytes;
+ byte[] buf = new byte[1024];
+ while ((readedBytes = fileInputStream.read(buf)) > 0) {
+ out.write(buf, 0, readedBytes);
+ }
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+ // Save the flowfile
+ byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+ FileOutputStream fos = new FileOutputStream(tmpParquet);
+ fos.write(resultContents);
+ fos.flush();
+ fos.close();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ ParquetMetadata metaData;
+ metaData = ParquetFileReader.readFooter(conf, new Path(tmpParquet.getAbsolutePath()), NO_FILTER);
+
+ // #number of records
+ long nParquetRecords = 0;
+ for(BlockMetaData meta : metaData.getBlocks()){
+ nParquetRecords += meta.getRowCount();
+ }
+ long nAvroRecord = records.size();
+
+ assertEquals(nParquetRecords, nAvroRecord);
+ }
+
+ @Test
+ public void test_Data() throws Exception {
+
+
+ FileInputStream fileInputStream = new FileInputStream(tmpAvro);
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ int readedBytes;
+ byte[] buf = new byte[1024];
+ while ((readedBytes = fileInputStream.read(buf)) > 0) {
+ out.write(buf, 0, readedBytes);
+ }
+ out.close();
+
+ Map<String, String> attributes = new HashMap<String, String>() {{
+ put(CoreAttributes.FILENAME.key(), "test.avro");
+ }};
+ runner.enqueue(out.toByteArray(), attributes);
+ runner.run();
+ MockFlowFile resultFlowFile = runner.getFlowFilesForRelationship(ConvertAvroToParquet.SUCCESS).get(0);
+
+ // Save the flowfile
+ byte[] resultContents = runner.getContentAsByteArray(resultFlowFile);
+ FileOutputStream fos = new FileOutputStream(tmpParquet);
+ fos.write(resultContents);
+ fos.flush();
+ fos.close();
+
+ Configuration conf = new Configuration();
+ FileSystem fs = FileSystem.getLocal(conf);
+ ParquetReader<Group> reader = ParquetReader.builder(new GroupReadSupport(), new Path(tmpParquet.getAbsolutePath()))
+ .withConf(conf)
+ .build();
+
+ List<Group> parquetRecords = new ArrayList<Group>();
+
+ Group current;
+ current = reader.read();
+ while (current != null) {
+ assertTrue(current instanceof Group);
+ parquetRecords.add(current);
+ current = reader.read();
+ }
+
+ Group firstRecord = parquetRecords.get(0);
+
+ // Primitive
+ assertEquals(firstRecord.getInteger("myint", 0), 1);
+ assertEquals(firstRecord.getLong("mylong", 0), 2);
+ assertEquals(firstRecord.getBoolean("myboolean", 0), true);
+ assertEquals(firstRecord.getFloat("myfloat", 0), 3.1, 0.0001);
+ assertEquals(firstRecord.getDouble("mydouble", 0), 4.1, 0.001);
+ assertEquals(firstRecord.getString("mybytes", 0), "hello");
+ assertEquals(firstRecord.getString("mystring", 0), "hello");
+
+ // Nested
+ assertEquals(firstRecord.getGroup("mynestedrecord",0).getInteger("mynestedint",0), 1);
+
+ // Array
+ assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",0).getInteger("element", 0), 1);
+ assertEquals(firstRecord.getGroup("myarray",0).getGroup("list",1).getInteger("element", 0), 2);
+
+ // Map
+ assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",0).getInteger("value", 0), 1);
+ assertEquals(firstRecord.getGroup("mymap",0).getGroup("map",1).getInteger("value", 0), 2);
+
+ // Fixed
+ assertEquals(firstRecord.getString("myfixed",0), "A");
+
+ }
+
+ @After
+ public void cleanup(){
+ tmpAvro.delete();
+ tmpParquet.delete();
+
+ }
+
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/e45584d0/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
new file mode 100644
index 0000000..3a383a0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-parquet-bundle/nifi-parquet-processors/src/test/resources/avro/all-minus-enum.avsc
@@ -0,0 +1,60 @@
+{
+ "name": "myrecord",
+ "namespace": "parquet.avro",
+ "type": "record",
+ "fields": [{
+ "name": "mynull",
+ "type": "null"
+ }, {
+ "name": "myboolean",
+ "type": "boolean"
+ }, {
+ "name": "myint",
+ "type": "int"
+ }, {
+ "name": "mylong",
+ "type": "long"
+ }, {
+ "name": "myfloat",
+ "type": "float"
+ }, {
+ "name": "mydouble",
+ "type": "double"
+ }, {
+ "name": "mybytes",
+ "type": "bytes"
+ }, {
+ "name": "mystring",
+ "type": "string"
+ }, {
+ "name": "mynestedrecord",
+ "type": {
+ "type": "record",
+ "name": "ignored1",
+ "fields": [{
+ "name": "mynestedint",
+ "type": "int"
+ }]
+ }
+ }, {
+ "name": "myarray",
+ "type": {
+ "type": "array",
+ "items": "int"
+ }
+ }, {
+ "name": "mymap",
+ "type": {
+ "type": "map",
+ "values": "int"
+ }
+ }, {
+ "name": "myfixed",
+ "type": {
+ "type": "fixed",
+ "name": "ignored3",
+ "namespace": "",
+ "size": 1
+ }
+ }]
+}
\ No newline at end of file