You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/07/14 14:26:05 UTC

[1/2] incubator-nifi git commit: NIFI-751 Add Processor To Convert Avro Formats

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop 8bd20510e -> 8ff69ca2d


NIFI-751 Add Processor To Convert Avro Formats

Implemented a new NiFi processor that allows avro records to be converted from one Avro schema
to another. This supports..
* Flattening records using . notation like "parent.id"
* Simple type conversions to String or base primitive types.
* Specifying field renames using dynamic properties.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bb64e70e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bb64e70e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bb64e70e

Branch: refs/heads/develop
Commit: bb64e70e6fbda12a9e0388b5c2240d96d34ac6bf
Parents: 8bd2051
Author: Alan Jackoway <al...@cloudera.com>
Authored: Tue Jul 7 17:28:26 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Jul 14 07:22:34 2015 -0400

----------------------------------------------------------------------
 .../processors/kite/AvroRecordConverter.java    | 320 +++++++++++++++++
 .../nifi/processors/kite/ConvertAvroSchema.java | 339 +++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../additionalDetails.html                      | 142 ++++++++
 .../kite/TestAvroRecordConverter.java           | 201 +++++++++++
 .../processors/kite/TestConvertAvroSchema.java  | 216 ++++++++++++
 6 files changed, 1219 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
new file mode 100644
index 0000000..68e6c98
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Responsible for converting records of one Avro type to another. Supports
+ * syntax like "record.field" to unpack fields and will try to do simple type
+ * conversion.
+ */
+public class AvroRecordConverter {
+    private final Schema inputSchema;
+    private final Schema outputSchema;
+    // Store this from output field to input field so we can look up by output.
+    private final Map<String, String> fieldMapping;
+
+    /**
+     * @param inputSchema
+     *            Schema of input record objects
+     * @param outputSchema
+     *            Schema of output record objects
+     * @param fieldMapping
+     *            Map from field name in input record to field name in output
+     *            record.
+     */
+    public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
+            Map<String, String> fieldMapping) {
+        this.inputSchema = inputSchema;
+        this.outputSchema = outputSchema;
+        // Need to reverse this map.
+        this.fieldMapping = Maps
+                .newHashMapWithExpectedSize(fieldMapping.size());
+        for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
+            this.fieldMapping.put(entry.getValue(), entry.getKey());
+        }
+    }
+
+    /**
+     * @return Any fields in the output schema that are not mapped or are mapped
+     *         by a non-existent input field.
+     */
+    public Collection<String> getUnmappedFields() {
+        List<String> result = Lists.newArrayList();
+        for (Field f : outputSchema.getFields()) {
+            String fieldName = f.name();
+            if (fieldMapping.containsKey(fieldName)) {
+                fieldName = fieldMapping.get(fieldName);
+            }
+
+            Schema currentSchema = inputSchema;
+            while (fieldName.contains(".")) {
+                // Recurse down the schema to find the right field.
+                int dotIndex = fieldName.indexOf('.');
+                String entityName = fieldName.substring(0, dotIndex);
+                // Get the schema. In case we had an optional record, choose
+                // just the record.
+                currentSchema = getNonNullSchema(currentSchema);
+                if (currentSchema.getField(entityName) == null) {
+                    // Tried to step into a schema that doesn't exist. Break out
+                    // of the loop
+                    break;
+                }
+                currentSchema = currentSchema.getField(entityName).schema();
+                fieldName = fieldName.substring(dotIndex + 1);
+            }
+            if (currentSchema == null
+                    || getNonNullSchema(currentSchema).getField(fieldName) == null) {
+                result.add(f.name());
+            }
+        }
+        return result;
+    }
+
+    /**
+     * Converts one record to another given a input and output schema plus
+     * explicit mappings for certain target fields.
+     *
+     * @param input
+     *            Input record to convert conforming to the inputSchema this
+     *            converter was created with.
+     * @return Record converted to the outputSchema this converter was created
+     *         with.
+     * @throws AvroConversionException
+     *             When schemas do not match or illegal conversions are
+     *             attempted, such as when numeric data fails to parse.
+     */
+    public Record convert(Record input) throws AvroConversionException {
+        Record result = new Record(outputSchema);
+        for (Field outputField : outputSchema.getFields()) {
+            // Default to matching by name
+            String inputFieldName = outputField.name();
+            if (fieldMapping.containsKey(outputField.name())) {
+                inputFieldName = fieldMapping.get(outputField.name());
+            }
+
+            IndexedRecord currentRecord = input;
+            Schema currentSchema = getNonNullSchema(inputSchema);
+            while (inputFieldName.contains(".")) {
+                // Recurse down the schema to find the right field.
+                int dotIndex = inputFieldName.indexOf('.');
+                String entityName = inputFieldName.substring(0, dotIndex);
+                // Get the record object
+                Object innerRecord = currentRecord.get(currentSchema.getField(
+                        entityName).pos());
+                if (innerRecord == null) {
+                    // Probably hit a null record here. Just break out of the
+                    // loop so that null object will be passed to convertData
+                    // below.
+                    currentRecord = null;
+                    break;
+                }
+                if (innerRecord != null
+                        && !(innerRecord instanceof IndexedRecord)) {
+                    throw new AvroConversionException(inputFieldName
+                            + " stepped through a non-record");
+                }
+                currentRecord = (IndexedRecord) innerRecord;
+
+                // Get the schema. In case we had an optional record, choose
+                // just the record.
+                currentSchema = currentSchema.getField(entityName).schema();
+                currentSchema = getNonNullSchema(currentSchema);
+                inputFieldName = inputFieldName.substring(dotIndex + 1);
+            }
+
+            // Current should now be in the right place to read the record.
+            Field f = currentSchema.getField(inputFieldName);
+            if (currentRecord == null) {
+                // We may have stepped into a null union type and gotten a null
+                // result.
+                Schema s = null;
+                if (f != null) {
+                    s = f.schema();
+                }
+                result.put(outputField.name(),
+                        convertData(null, s, outputField.schema()));
+            } else {
+                result.put(
+                        outputField.name(),
+                        convertData(currentRecord.get(f.pos()), f.schema(),
+                                outputField.schema()));
+            }
+        }
+        return result;
+    }
+
+    public Schema getInputSchema() {
+        return inputSchema;
+    }
+
+    public Schema getOutputSchema() {
+        return outputSchema;
+    }
+
+    /**
+     * Converts the data from one schema to another. If the types are the same,
+     * no change will be made, but simple conversions will be attempted for
+     * other types.
+     *
+     * @param content
+     *            The data to convert, generally taken from a field in an input
+     *            Record.
+     * @param inputSchema
+     *            The schema of the content object
+     * @param outputSchema
+     *            The schema to convert to.
+     * @return The content object, converted to the output schema.
+     * @throws AvroConversionException
+     *             When conversion is impossible, either because the output type
+     *             is not supported or because numeric data failed to parse.
+     */
+    private Object convertData(Object content, Schema inputSchema,
+            Schema outputSchema) throws AvroConversionException {
+        if (content == null) {
+            // No conversion can happen here.
+            if (supportsNull(outputSchema)) {
+                return null;
+            }
+            throw new AvroConversionException("Output schema " + outputSchema
+                    + " does not support null");
+        }
+
+        Schema nonNillInput = getNonNullSchema(inputSchema);
+        Schema nonNillOutput = getNonNullSchema(outputSchema);
+        if (nonNillInput.getType().equals(nonNillOutput.getType())) {
+            return content;
+        } else {
+            if (nonNillOutput.getType() == Schema.Type.STRING) {
+                return content.toString();
+            }
+
+            // For the non-string cases of these, we will try to convert through
+            // string using Scanner to validate types. This means we could
+            // return questionable results when a String starts with a number
+            // but then contains other content
+            Scanner scanner = new Scanner(content.toString());
+            switch (nonNillOutput.getType()) {
+            case LONG:
+                if (scanner.hasNextLong()) {
+                    return scanner.nextLong();
+                } else {
+                    throw new AvroConversionException("Cannot convert "
+                            + content + " to long");
+                }
+            case INT:
+                if (scanner.hasNextInt()) {
+                    return scanner.nextInt();
+                } else {
+                    throw new AvroConversionException("Cannot convert "
+                            + content + " to int");
+                }
+            case DOUBLE:
+                if (scanner.hasNextDouble()) {
+                    return scanner.nextDouble();
+                } else {
+                    throw new AvroConversionException("Cannot convert "
+                            + content + " to double");
+                }
+            case FLOAT:
+                if (scanner.hasNextFloat()) {
+                    return scanner.nextFloat();
+                } else {
+                    throw new AvroConversionException("Cannot convert "
+                            + content + " to float");
+                }
+            default:
+                throw new AvroConversionException("Cannot convert to type "
+                        + nonNillOutput.getType());
+            }
+        }
+    }
+
+    /**
+     * If s is a union schema of some type with null, returns that type.
+     * Otherwise just return schema itself.
+     *
+     * Does not handle unions of schemas with anything except null and one type.
+     *
+     * @param s
+     *            Schema to remove nillable from.
+     * @return The Schema of the non-null part of a the union, if the input was
+     *         a union type. Otherwise returns the input schema.
+     */
+    protected static Schema getNonNullSchema(Schema s) {
+        // Handle the case where s is a union type. Assert that this must be a
+        // union that only includes one non-null type.
+        if (s.getType() == Schema.Type.UNION) {
+            List<Schema> types = s.getTypes();
+            boolean foundOne = false;
+            Schema result = s;
+            for (Schema type : types) {
+                if (!type.getType().equals(Schema.Type.NULL)) {
+                    Preconditions.checkArgument(foundOne == false,
+                            "Cannot handle union of two non-null types");
+                    foundOne = true;
+                    result = type;
+                }
+            }
+            return result;
+        } else {
+            return s;
+        }
+    }
+
+    protected static boolean supportsNull(Schema s) {
+        if (s.getType() == Schema.Type.NULL) {
+            return true;
+        } else if (s.getType() == Schema.Type.UNION) {
+            for (Schema type : s.getTypes()) {
+                if (type.getType() == Schema.Type.NULL) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
+    /**
+     * Exception thrown when Avro conversion fails.
+     */
+    public class AvroConversionException extends Exception {
+        public AvroConversionException(String string, IOException e) {
+            super(string, e);
+        }
+
+        public AvroConversionException(String string) {
+            super(string);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
new file mode 100644
index 0000000..0d9f658
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
+import org.apache.nifi.util.LongHolder;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.SchemaNotFoundException;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Tags({ "avro", "convert", "kite" })
+@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
+@DynamicProperty(name = "Field name from input schema",
+value = "Field name for output schema",
+description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
+public class ConvertAvroSchema extends AbstractKiteProcessor {
+
+    private static final Relationship SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("Avro content that converted successfully").build();
+
+    private static final Relationship FAILURE = new Relationship.Builder()
+            .name("failure").description("Avro content that failed to convert")
+            .build();
+
+    /**
+     * Makes sure the output schema is a valid output schema and that all its
+     * fields can be mapped either automatically or are explicitly mapped.
+     */
+    protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(String subject, String uri,
+                ValidationContext context) {
+            Configuration conf = getConfiguration(context.getProperty(
+                    CONF_XML_FILES).getValue());
+            String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
+            String error = null;
+
+            final boolean elPresent = context
+                    .isExpressionLanguageSupported(subject)
+                    && context.isExpressionLanguagePresent(uri);
+            if (!elPresent) {
+                try {
+                    Schema outputSchema = getSchema(uri, conf);
+                    Schema inputSchema = getSchema(inputUri, conf);
+                    // Get the explicitly mapped fields. This is identical to
+                    // logic in onTrigger, but ValidationContext and
+                    // ProcessContext share no ancestor, so we cannot generalize
+                    // the code.
+                    Map<String, String> fieldMapping = new HashMap<>();
+                    for (final Map.Entry<PropertyDescriptor, String> entry : context
+                            .getProperties().entrySet()) {
+                        if (entry.getKey().isDynamic()) {
+                            fieldMapping.put(entry.getKey().getName(),
+                                    entry.getValue());
+                        }
+                    }
+                    AvroRecordConverter converter = new AvroRecordConverter(
+                            inputSchema, outputSchema, fieldMapping);
+                    Collection<String> unmappedFields = converter
+                            .getUnmappedFields();
+                    if (unmappedFields.size() > 0) {
+                        error = "The following fields are unmapped: "
+                                + unmappedFields;
+                    }
+
+                } catch (SchemaNotFoundException e) {
+                    error = e.getMessage();
+                }
+            }
+            return new ValidationResult.Builder().subject(subject).input(uri)
+                    .explanation(error).valid(error == null).build();
+        }
+    };
+
+    @VisibleForTesting
+    static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
+            .name("Input Schema").description("Avro Schema of Input Flowfiles")
+            .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+            .required(true).build();
+
+    @VisibleForTesting
+    static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder()
+            .name("Output Schema")
+            .description("Avro Schema of Output Flowfiles")
+            .addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+            .required(true).build();
+
+    private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
+            .<PropertyDescriptor> builder()
+            .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
+            .add(OUTPUT_SCHEMA).build();
+
+    private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
+            .<Relationship> builder().add(SUCCESS).add(FAILURE).build();
+
+    private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern
+            .compile("[A-Za-z_][A-Za-z0-9_\\.]*");
+
+    /**
+     * Validates that the input and output fields (from dynamic properties) are
+     * all valid avro field names including "." to step into records.
+     */
+    protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() {
+        @Override
+        public ValidationResult validate(final String subject,
+                final String value, final ValidationContext context) {
+            if (context.isExpressionLanguageSupported(subject)
+                    && context.isExpressionLanguagePresent(value)) {
+                return new ValidationResult.Builder().subject(subject)
+                        .input(value)
+                        .explanation("Expression Language Present").valid(true)
+                        .build();
+            }
+
+            String reason = "";
+            if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
+                reason = subject + " is not a valid Avro fieldname";
+            }
+            if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
+                reason = reason + value + " is not a valid Avro fieldname";
+            }
+
+            return new ValidationResult.Builder().subject(subject).input(value)
+                    .explanation(reason).valid(reason.equals("")).build();
+        }
+    };
+
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
+            final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .name(propertyDescriptorName)
+                .description(
+                        "Field mapping between schemas. The property name is the field name for the input "
+                                + "schema, and the property value is the field name for the output schema. For fields "
+                                + "not listed, the processor tries to match names from the input to the output record.")
+                .dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return PROPERTIES;
+    }
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return RELATIONSHIPS;
+    }
+
+    @Override
+    public void onTrigger(ProcessContext context, final ProcessSession session)
+            throws ProcessException {
+        FlowFile incomingAvro = session.get();
+        if (incomingAvro == null) {
+            return;
+        }
+
+        String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
+                .evaluateAttributeExpressions(incomingAvro).getValue();
+        final Schema inputSchema;
+        try {
+            inputSchema = getSchema(inputSchemaProperty,
+                    DefaultConfiguration.get());
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Cannot find schema: " + inputSchemaProperty);
+            session.transfer(incomingAvro, FAILURE);
+            return;
+        }
+        String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
+                .evaluateAttributeExpressions(incomingAvro).getValue();
+        final Schema outputSchema;
+        try {
+            outputSchema = getSchema(outputSchemaProperty,
+                    DefaultConfiguration.get());
+        } catch (SchemaNotFoundException e) {
+            getLogger().error("Cannot find schema: " + outputSchemaProperty);
+            session.transfer(incomingAvro, FAILURE);
+            return;
+        }
+        final Map<String, String> fieldMapping = new HashMap<>();
+        for (final Map.Entry<PropertyDescriptor, String> entry : context
+                .getProperties().entrySet()) {
+            if (entry.getKey().isDynamic()) {
+                fieldMapping.put(entry.getKey().getName(), entry.getValue());
+            }
+        }
+        final AvroRecordConverter converter = new AvroRecordConverter(
+                inputSchema, outputSchema, fieldMapping);
+
+        final DataFileWriter<Record> writer = new DataFileWriter<>(
+                AvroUtil.newDatumWriter(outputSchema, Record.class));
+        writer.setCodec(CodecFactory.snappyCodec());
+
+        final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
+                AvroUtil.newDatumWriter(outputSchema, Record.class));
+        failureWriter.setCodec(CodecFactory.snappyCodec());
+
+        try {
+            final LongHolder written = new LongHolder(0L);
+            final FailureTracker failures = new FailureTracker();
+
+            final List<Record> badRecords = Lists.newLinkedList();
+            FlowFile incomingAvroCopy = session.clone(incomingAvro);
+            FlowFile outgoingAvro = session.write(incomingAvro,
+                    new StreamCallback() {
+                        @Override
+                        public void process(InputStream in, OutputStream out)
+                                throws IOException {
+                            try (DataFileStream<Record> stream = new DataFileStream<Record>(
+                                    in, new GenericDatumReader<Record>(
+                                            converter.getInputSchema()))) {
+                                try (DataFileWriter<Record> w = writer.create(
+                                        outputSchema, out)) {
+                                    for (Record record : stream) {
+                                        try {
+                                            Record converted = converter
+                                                    .convert(record);
+                                            w.append(converted);
+                                            written.incrementAndGet();
+                                        } catch (AvroConversionException e) {
+                                            failures.add(e);
+                                            getLogger().error(
+                                                    "Error converting data: "
+                                                            + e.getMessage());
+                                            badRecords.add(record);
+                                        }
+                                    }
+                                }
+                            }
+                        }
+                    });
+
+            FlowFile badOutput = session.write(incomingAvroCopy,
+                    new StreamCallback() {
+                        @Override
+                        public void process(InputStream in, OutputStream out)
+                                throws IOException {
+
+                            try (DataFileWriter<Record> w = failureWriter
+                                    .create(inputSchema, out)) {
+                                for (Record record : badRecords) {
+                                    w.append(record);
+                                }
+                            }
+
+                        }
+                    });
+
+            long errors = failures.count();
+
+            // update only if file transfer is successful
+            session.adjustCounter("Converted records", written.get(), false);
+            // update only if file transfer is successful
+            session.adjustCounter("Conversion errors", errors, false);
+
+            if (written.get() > 0L) {
+                session.transfer(outgoingAvro, SUCCESS);
+            } else {
+                session.remove(outgoingAvro);
+
+                if (errors == 0L) {
+                    badOutput = session.putAttribute(badOutput, "errors",
+                            "No incoming records");
+                    session.transfer(badOutput, FAILURE);
+                }
+            }
+
+            if (errors > 0L) {
+                getLogger().warn(
+                        "Failed to convert {}/{} records between Avro Schemas",
+                        new Object[] { errors, errors + written.get() });
+                badOutput = session.putAttribute(badOutput, "errors",
+                        failures.summary());
+                session.transfer(badOutput, FAILURE);
+            } else {
+                session.remove(badOutput);
+            }
+        } catch (ProcessException | DatasetIOException e) {
+            getLogger().error("Failed reading or writing", e);
+            session.transfer(incomingAvro, FAILURE);
+        } catch (DatasetException e) {
+            getLogger().error("Failed to read FlowFile", e);
+            session.transfer(incomingAvro, FAILURE);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6de5612..ea99ff6 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
 org.apache.nifi.processors.kite.StoreInKiteDataset
 org.apache.nifi.processors.kite.ConvertCSVToAvro
 org.apache.nifi.processors.kite.ConvertJSONToAvro
+org.apache.nifi.processors.kite.ConvertAvroSchema

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
new file mode 100644
index 0000000..f5d8a1d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
@@ -0,0 +1,142 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>ConvertAvroSchema</title>
+
+        <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+    </head>
+
+    <body>
+        <!-- Processor Documentation ================================================== -->
+        <h2>Description:</h2>
+        <p>This processor is used to convert data between two Avro formats, such as those coming from the <code>ConvertCSVToAvro</code> or
+            <code>ConvertJSONToAvro</code> processors. The input and output content of the flow files should be Avro data files. The processor
+            includes support for the following basic type conversions:
+            <ul>
+              <li>Anything to String, using the data's default String representation</li>
+              <li>String types to numeric types int, long, double, and float</li>
+              <li>Conversion to and from optional Avro types</li>
+            </ul>
+            In addition, fields can be renamed or unpacked from a record type by using the dynamic properties.
+         </p>
+         <h2>Mapping Example:</h2>
+         <p>
+             Throughout this example, we will refer to input data with the following schema:
+             <pre>
+{
+    "type": "record",
+    "name": "CustomerInput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "string"
+        },
+        {
+            "name": "companyName",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name" : "parent",
+            "type" : [ "null", {
+              "type" : "record",
+              "name" : "parent",
+              "fields" : [ {
+                "name" : "name",
+                "type" : ["null", "string"],
+                "default" : null
+              }, {
+                "name" : "id",
+                "type" : "string"
+              } ]
+            } ],
+            "default" : null
+        }
+    ]
+}
+             </pre>
+             Where even though the revenue and id fields are mapped as string, they are logically long and double respectively.
+             By default, fields with matching names will be mapped automatically, so the following output schema could be converted
+             without using dynamic properties:
+             <pre>
+{
+    "type": "record",
+    "name": "SimpleCustomerOutput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "long"
+        },
+        {
+            "name": "companyName",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "double"],
+            "default": null
+        }
+    ]
+}
+             </pre>
+             To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided.
+             For example, to convert to the following schema:
+             <pre>
+{
+    "type": "record",
+    "name": "SimpleCustomerOutput",
+    "namespace": "org.apache.example",
+    "fields": [
+        {
+            "name": "id",
+            "type": "long"
+        },
+        {
+            "name": "name",
+            "type": ["null", "string"],
+            "default": null
+        },
+        {
+            "name": "revenue",
+            "type": ["null", "double"],
+            "default": null
+        },
+        {
+            "name": "parentId",
+            "type": ["null", "long"],
+            "default": null
+        }
+    ]
+}
+             </pre>
+             The following dynamic properties would be used:
+             <pre>
+"companyName" -> "name"
+"parent.id" -> "parentId"
+             </pre>
+        </p>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
new file mode 100644
index 0000000..1a4748f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData.Record;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestAvroRecordConverter {
+    final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+    final static Map<String, String> EMPTY_MAPPING = ImmutableMap.of();
+    final static String NESTED_RECORD_SCHEMA_STRING = "{\n"
+            + "    \"type\": \"record\",\n"
+            + "    \"name\": \"NestedInput\",\n"
+            + "    \"namespace\": \"org.apache.example\",\n"
+            + "    \"fields\": [\n" + "        {\n"
+            + "            \"name\": \"l1\",\n"
+            + "            \"type\": \"long\"\n"
+            + "        },\n"
+            + "        {\n" + "            \"name\": \"s1\",\n"
+            + "            \"type\": \"string\"\n"
+            + "        },\n"
+            + "        {\n"
+            + "            \"name\": \"parent\",\n"
+            + "            \"type\": [\"null\", {\n"
+            + "              \"type\": \"record\",\n"
+            + "              \"name\": \"parent\",\n"
+            + "              \"fields\": [\n"
+            + "                { \"name\": \"id\", \"type\": \"long\" },\n"
+            + "                { \"name\": \"name\", \"type\": \"string\" }\n"
+            + "              ]"
+            + "            } ]"
+            + "        }"
+            + "   ] }";
+    final static Schema NESTED_RECORD_SCHEMA = new Schema.Parser()
+            .parse(NESTED_RECORD_SCHEMA_STRING);
+    final static Schema NESTED_PARENT_SCHEMA = AvroRecordConverter
+            .getNonNullSchema(NESTED_RECORD_SCHEMA.getField("parent").schema());
+    final static Schema UNNESTED_OUTPUT_SCHEMA = SchemaBuilder.record("Output")
+            .namespace("org.apache.example").fields().requiredLong("l1")
+            .requiredLong("s1").optionalLong("parentId").endRecord();
+
+    /**
+     * Tests the case where we don't use a mapping file and just map records by
+     * name.
+     */
+    @Test
+    public void testDefaultConversion() throws Exception {
+        // We will convert s1 from string to long (or leave it null), ignore s2,
+        // convert s3 to from string to double, convert l1 from long to string,
+        // and leave l2 the same.
+        Schema input = SchemaBuilder.record("Input")
+                .namespace("com.cloudera.edh").fields()
+                .nullableString("s1", "").requiredString("s2")
+                .requiredString("s3").optionalLong("l1").requiredLong("l2")
+                .endRecord();
+        Schema output = SchemaBuilder.record("Output")
+                .namespace("com.cloudera.edh").fields().optionalLong("s1")
+                .optionalString("l1").requiredLong("l2").requiredDouble("s3")
+                .endRecord();
+
+        AvroRecordConverter converter = new AvroRecordConverter(input, output,
+                EMPTY_MAPPING);
+
+        Record inputRecord = new Record(input);
+        inputRecord.put("s1", null);
+        inputRecord.put("s2", "blah");
+        inputRecord.put("s3", "5.5");
+        inputRecord.put("l1", null);
+        inputRecord.put("l2", 5L);
+        Record outputRecord = converter.convert(inputRecord);
+        assertNull(outputRecord.get("s1"));
+        assertNull(outputRecord.get("l1"));
+        assertEquals(5L, outputRecord.get("l2"));
+        assertEquals(5.5, outputRecord.get("s3"));
+
+        inputRecord.put("s1", "500");
+        inputRecord.put("s2", "blah");
+        inputRecord.put("s3", "5.5e-5");
+        inputRecord.put("l1", 100L);
+        inputRecord.put("l2", 2L);
+        outputRecord = converter.convert(inputRecord);
+        assertEquals(500L, outputRecord.get("s1"));
+        assertEquals("100", outputRecord.get("l1"));
+        assertEquals(2L, outputRecord.get("l2"));
+        assertEquals(5.5e-5, outputRecord.get("s3"));
+    }
+
+    /**
+     * Tests the case where we want to default map one field and explicitly map
+     * another.
+     */
+    @Test
+    public void testExplicitMapping() throws Exception {
+        // We will convert s1 from string to long (or leave it null), ignore s2,
+        // convert l1 from long to string, and leave l2 the same.
+        Schema input = NESTED_RECORD_SCHEMA;
+        Schema parent = NESTED_PARENT_SCHEMA;
+        Schema output = UNNESTED_OUTPUT_SCHEMA;
+        Map<String, String> mapping = ImmutableMap.of("parent.id", "parentId");
+
+        AvroRecordConverter converter = new AvroRecordConverter(input, output,
+                mapping);
+
+        Record inputRecord = new Record(input);
+        inputRecord.put("l1", 5L);
+        inputRecord.put("s1", "1000");
+        Record parentRecord = new Record(parent);
+        parentRecord.put("id", 200L);
+        parentRecord.put("name", "parent");
+        inputRecord.put("parent", parentRecord);
+        Record outputRecord = converter.convert(inputRecord);
+        assertEquals(5L, outputRecord.get("l1"));
+        assertEquals(1000L, outputRecord.get("s1"));
+        assertEquals(200L, outputRecord.get("parentId"));
+    }
+
+    /**
+     * Tests the case where we try to convert a string to a long incorrectly.
+     */
+    @Test(expected = org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException.class)
+    public void testIllegalConversion() throws Exception {
+        // We will convert s1 from string to long (or leave it null), ignore s2,
+        // convert l1 from long to string, and leave l2 the same.
+        Schema input = SchemaBuilder.record("Input")
+                .namespace("com.cloudera.edh").fields()
+                .nullableString("s1", "").requiredString("s2")
+                .optionalLong("l1").requiredLong("l2").endRecord();
+        Schema output = SchemaBuilder.record("Output")
+                .namespace("com.cloudera.edh").fields().optionalLong("s1")
+                .optionalString("l1").requiredLong("l2").endRecord();
+
+        AvroRecordConverter converter = new AvroRecordConverter(input, output,
+                EMPTY_MAPPING);
+
+        Record inputRecord = new Record(input);
+        inputRecord.put("s1", "blah");
+        inputRecord.put("s2", "blah");
+        inputRecord.put("l1", null);
+        inputRecord.put("l2", 5L);
+        converter.convert(inputRecord);
+    }
+
+    @Test
+    public void testGetUnmappedFields() throws Exception {
+        Schema input = SchemaBuilder.record("Input")
+                .namespace("com.cloudera.edh").fields()
+                .nullableString("s1", "").requiredString("s2")
+                .optionalLong("l1").requiredLong("l2").endRecord();
+        Schema output = SchemaBuilder.record("Output")
+                .namespace("com.cloudera.edh").fields().optionalLong("field")
+                .endRecord();
+
+        // Test the case where the field isn't mapped at all.
+        AvroRecordConverter converter = new AvroRecordConverter(input, output,
+                EMPTY_MAPPING);
+        assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+        // Test the case where we tried to map from a non-existent field.
+        converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+                "nonExistentField", "field"));
+        assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+        // Test the case where we tried to map from a non-existent record.
+        converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+                "parent.nonExistentField", "field"));
+        assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+        // Test a valid case
+        converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+                "l2", "field"));
+        assertEquals(Collections.EMPTY_LIST, converter.getUnmappedFields());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
new file mode 100644
index 0000000..33f3a82
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestConvertAvroSchema {
+
+    public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
+            .fields().requiredString("id").requiredString("primaryColor")
+            .optionalString("secondaryColor").optionalString("price")
+            .endRecord();
+
+    public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
+            .fields().requiredLong("id").requiredString("color")
+            .optionalDouble("price").endRecord();
+
+    public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
+
+    public static final String FAILURE_SUMMARY = "Cannot convert free to double";
+
+    @Test
+    public void testBasicConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+                INPUT_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+                OUTPUT_SCHEMA.toString());
+        runner.setProperty("primaryColor", "color");
+        runner.assertValid();
+
+        // Two valid rows, and one invalid because "free" is not a double.
+        Record goodRecord1 = dataBasic("1", "blue", null, null);
+        Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
+        Record badRecord = dataBasic("3", "red", "yellow", "free");
+        List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
+                badRecord);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+                "failure").get(0);
+        GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
+                INPUT_SCHEMA);
+        DataFileStream<Record> stream = new DataFileStream<Record>(
+                new ByteArrayInputStream(
+                        runner.getContentAsByteArray(incompatible)), reader);
+        int count = 0;
+        for (Record r : stream) {
+            Assert.assertEquals(badRecord, r);
+            count++;
+        }
+        stream.close();
+        Assert.assertEquals(1, count);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+        GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+                OUTPUT_SCHEMA);
+        DataFileStream<Record> successStream = new DataFileStream<Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        count = 0;
+        for (Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertBasic(goodRecord1), r);
+            } else {
+                Assert.assertEquals(convertBasic(goodRecord2), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    @Test
+    public void testNestedConversion() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+                TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
+        runner.setProperty("parent.id", "parentId");
+        runner.assertValid();
+
+        // Two valid rows
+        Record goodRecord1 = dataNested(1L, "200", null, null);
+        Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
+        List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 0 rows", 0, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+
+        GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+        DataFileStream<Record> successStream = new DataFileStream<Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        int count = 0;
+        for (Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertNested(goodRecord1), r);
+            } else {
+                Assert.assertEquals(convertNested(goodRecord2), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    private Record convertBasic(Record inputRecord) {
+        Record result = new Record(OUTPUT_SCHEMA);
+        result.put("id", Long.parseLong(inputRecord.get("id").toString()));
+        result.put("color", inputRecord.get("primaryColor").toString());
+        if (inputRecord.get("price") == null) {
+            result.put("price", null);
+        } else {
+            result.put("price",
+                    Double.parseDouble(inputRecord.get("price").toString()));
+        }
+        return result;
+    }
+
+    private Record dataBasic(String id, String primaryColor,
+            String secondaryColor, String price) {
+        Record result = new Record(INPUT_SCHEMA);
+        result.put("id", id);
+        result.put("primaryColor", primaryColor);
+        result.put("secondaryColor", secondaryColor);
+        result.put("price", price);
+        return result;
+    }
+
+    private Record convertNested(Record inputRecord) {
+        Record result = new Record(
+                TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+        result.put("l1", inputRecord.get("l1"));
+        result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
+        if (inputRecord.get("parent") != null) {
+            // output schema doesn't have parent name.
+            result.put("parentId",
+                    ((Record) inputRecord.get("parent")).get("id"));
+        }
+        return result;
+    }
+
+    private Record dataNested(long id, String companyName, Long parentId,
+            String parentName) {
+        Record result = new Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
+        result.put("l1", id);
+        result.put("s1", companyName);
+        if (parentId != null || parentName != null) {
+            Record parent = new Record(
+                    TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
+            parent.put("id", parentId);
+            parent.put("name", parentName);
+            result.put("parent", parent);
+        }
+        return result;
+    }
+}


[2/2] incubator-nifi git commit: NIFI-751 PR #70 removed extraneous reference to abstract properties pulling in hadoop conf

Posted by jo...@apache.org.
NIFI-751 PR #70 removed extraneous reference to abstract properties pulling in hadoop conf


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8ff69ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8ff69ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8ff69ca2

Branch: refs/heads/develop
Commit: 8ff69ca2d1bc5b7d651b7495f2ce45def12bebc2
Parents: bb64e70
Author: joewitt <jo...@apache.org>
Authored: Tue Jul 14 07:41:07 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Jul 14 07:41:07 2015 -0400

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/kite/ConvertAvroSchema.java    | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8ff69ca2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
index 0d9f658..daeb548 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -140,7 +140,7 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
 
     private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
             .<PropertyDescriptor> builder()
-            .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
+            .add(INPUT_SCHEMA)
             .add(OUTPUT_SCHEMA).build();
 
     private static final Set<Relationship> RELATIONSHIPS = ImmutableSet