You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/07/14 14:26:05 UTC
[1/2] incubator-nifi git commit: NIFI-751 Add Processor To Convert
Avro Formats
Repository: incubator-nifi
Updated Branches:
refs/heads/develop 8bd20510e -> 8ff69ca2d
NIFI-751 Add Processor To Convert Avro Formats
Implemented a new NiFi processor that allows avro records to be converted from one Avro schema
to another. This supports..
* Flattening records using . notation like "parent.id"
* Simple type conversions to String or base primitive types.
* Specifying field renames using dynamic properties.
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bb64e70e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bb64e70e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bb64e70e
Branch: refs/heads/develop
Commit: bb64e70e6fbda12a9e0388b5c2240d96d34ac6bf
Parents: 8bd2051
Author: Alan Jackoway <al...@cloudera.com>
Authored: Tue Jul 7 17:28:26 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Jul 14 07:22:34 2015 -0400
----------------------------------------------------------------------
.../processors/kite/AvroRecordConverter.java | 320 +++++++++++++++++
.../nifi/processors/kite/ConvertAvroSchema.java | 339 +++++++++++++++++++
.../org.apache.nifi.processor.Processor | 1 +
.../additionalDetails.html | 142 ++++++++
.../kite/TestAvroRecordConverter.java | 201 +++++++++++
.../processors/kite/TestConvertAvroSchema.java | 216 ++++++++++++
6 files changed, 1219 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
new file mode 100644
index 0000000..68e6c98
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AvroRecordConverter.java
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Scanner;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.IndexedRecord;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Responsible for converting records of one Avro type to another. Supports
+ * syntax like "record.field" to unpack fields and will try to do simple type
+ * conversion.
+ */
+public class AvroRecordConverter {
+ private final Schema inputSchema;
+ private final Schema outputSchema;
+ // Store this from output field to input field so we can look up by output.
+ private final Map<String, String> fieldMapping;
+
+ /**
+ * @param inputSchema
+ * Schema of input record objects
+ * @param outputSchema
+ * Schema of output record objects
+ * @param fieldMapping
+ * Map from field name in input record to field name in output
+ * record.
+ */
+ public AvroRecordConverter(Schema inputSchema, Schema outputSchema,
+ Map<String, String> fieldMapping) {
+ this.inputSchema = inputSchema;
+ this.outputSchema = outputSchema;
+ // Need to reverse this map.
+ this.fieldMapping = Maps
+ .newHashMapWithExpectedSize(fieldMapping.size());
+ for (Map.Entry<String, String> entry : fieldMapping.entrySet()) {
+ this.fieldMapping.put(entry.getValue(), entry.getKey());
+ }
+ }
+
+ /**
+ * @return Any fields in the output schema that are not mapped or are mapped
+ * by a non-existent input field.
+ */
+ public Collection<String> getUnmappedFields() {
+ List<String> result = Lists.newArrayList();
+ for (Field f : outputSchema.getFields()) {
+ String fieldName = f.name();
+ if (fieldMapping.containsKey(fieldName)) {
+ fieldName = fieldMapping.get(fieldName);
+ }
+
+ Schema currentSchema = inputSchema;
+ while (fieldName.contains(".")) {
+ // Recurse down the schema to find the right field.
+ int dotIndex = fieldName.indexOf('.');
+ String entityName = fieldName.substring(0, dotIndex);
+ // Get the schema. In case we had an optional record, choose
+ // just the record.
+ currentSchema = getNonNullSchema(currentSchema);
+ if (currentSchema.getField(entityName) == null) {
+ // Tried to step into a schema that doesn't exist. Break out
+ // of the loop
+ break;
+ }
+ currentSchema = currentSchema.getField(entityName).schema();
+ fieldName = fieldName.substring(dotIndex + 1);
+ }
+ if (currentSchema == null
+ || getNonNullSchema(currentSchema).getField(fieldName) == null) {
+ result.add(f.name());
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Converts one record to another given a input and output schema plus
+ * explicit mappings for certain target fields.
+ *
+ * @param input
+ * Input record to convert conforming to the inputSchema this
+ * converter was created with.
+ * @return Record converted to the outputSchema this converter was created
+ * with.
+ * @throws AvroConversionException
+ * When schemas do not match or illegal conversions are
+ * attempted, such as when numeric data fails to parse.
+ */
+ public Record convert(Record input) throws AvroConversionException {
+ Record result = new Record(outputSchema);
+ for (Field outputField : outputSchema.getFields()) {
+ // Default to matching by name
+ String inputFieldName = outputField.name();
+ if (fieldMapping.containsKey(outputField.name())) {
+ inputFieldName = fieldMapping.get(outputField.name());
+ }
+
+ IndexedRecord currentRecord = input;
+ Schema currentSchema = getNonNullSchema(inputSchema);
+ while (inputFieldName.contains(".")) {
+ // Recurse down the schema to find the right field.
+ int dotIndex = inputFieldName.indexOf('.');
+ String entityName = inputFieldName.substring(0, dotIndex);
+ // Get the record object
+ Object innerRecord = currentRecord.get(currentSchema.getField(
+ entityName).pos());
+ if (innerRecord == null) {
+ // Probably hit a null record here. Just break out of the
+ // loop so that null object will be passed to convertData
+ // below.
+ currentRecord = null;
+ break;
+ }
+ if (innerRecord != null
+ && !(innerRecord instanceof IndexedRecord)) {
+ throw new AvroConversionException(inputFieldName
+ + " stepped through a non-record");
+ }
+ currentRecord = (IndexedRecord) innerRecord;
+
+ // Get the schema. In case we had an optional record, choose
+ // just the record.
+ currentSchema = currentSchema.getField(entityName).schema();
+ currentSchema = getNonNullSchema(currentSchema);
+ inputFieldName = inputFieldName.substring(dotIndex + 1);
+ }
+
+ // Current should now be in the right place to read the record.
+ Field f = currentSchema.getField(inputFieldName);
+ if (currentRecord == null) {
+ // We may have stepped into a null union type and gotten a null
+ // result.
+ Schema s = null;
+ if (f != null) {
+ s = f.schema();
+ }
+ result.put(outputField.name(),
+ convertData(null, s, outputField.schema()));
+ } else {
+ result.put(
+ outputField.name(),
+ convertData(currentRecord.get(f.pos()), f.schema(),
+ outputField.schema()));
+ }
+ }
+ return result;
+ }
+
+ public Schema getInputSchema() {
+ return inputSchema;
+ }
+
+ public Schema getOutputSchema() {
+ return outputSchema;
+ }
+
+ /**
+ * Converts the data from one schema to another. If the types are the same,
+ * no change will be made, but simple conversions will be attempted for
+ * other types.
+ *
+ * @param content
+ * The data to convert, generally taken from a field in an input
+ * Record.
+ * @param inputSchema
+ * The schema of the content object
+ * @param outputSchema
+ * The schema to convert to.
+ * @return The content object, converted to the output schema.
+ * @throws AvroConversionException
+ * When conversion is impossible, either because the output type
+ * is not supported or because numeric data failed to parse.
+ */
+ private Object convertData(Object content, Schema inputSchema,
+ Schema outputSchema) throws AvroConversionException {
+ if (content == null) {
+ // No conversion can happen here.
+ if (supportsNull(outputSchema)) {
+ return null;
+ }
+ throw new AvroConversionException("Output schema " + outputSchema
+ + " does not support null");
+ }
+
+ Schema nonNillInput = getNonNullSchema(inputSchema);
+ Schema nonNillOutput = getNonNullSchema(outputSchema);
+ if (nonNillInput.getType().equals(nonNillOutput.getType())) {
+ return content;
+ } else {
+ if (nonNillOutput.getType() == Schema.Type.STRING) {
+ return content.toString();
+ }
+
+ // For the non-string cases of these, we will try to convert through
+ // string using Scanner to validate types. This means we could
+ // return questionable results when a String starts with a number
+ // but then contains other content
+ Scanner scanner = new Scanner(content.toString());
+ switch (nonNillOutput.getType()) {
+ case LONG:
+ if (scanner.hasNextLong()) {
+ return scanner.nextLong();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to long");
+ }
+ case INT:
+ if (scanner.hasNextInt()) {
+ return scanner.nextInt();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to int");
+ }
+ case DOUBLE:
+ if (scanner.hasNextDouble()) {
+ return scanner.nextDouble();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to double");
+ }
+ case FLOAT:
+ if (scanner.hasNextFloat()) {
+ return scanner.nextFloat();
+ } else {
+ throw new AvroConversionException("Cannot convert "
+ + content + " to float");
+ }
+ default:
+ throw new AvroConversionException("Cannot convert to type "
+ + nonNillOutput.getType());
+ }
+ }
+ }
+
+ /**
+ * If s is a union schema of some type with null, returns that type.
+ * Otherwise just return schema itself.
+ *
+ * Does not handle unions of schemas with anything except null and one type.
+ *
+ * @param s
+ * Schema to remove nillable from.
+ * @return The Schema of the non-null part of a the union, if the input was
+ * a union type. Otherwise returns the input schema.
+ */
+ protected static Schema getNonNullSchema(Schema s) {
+ // Handle the case where s is a union type. Assert that this must be a
+ // union that only includes one non-null type.
+ if (s.getType() == Schema.Type.UNION) {
+ List<Schema> types = s.getTypes();
+ boolean foundOne = false;
+ Schema result = s;
+ for (Schema type : types) {
+ if (!type.getType().equals(Schema.Type.NULL)) {
+ Preconditions.checkArgument(foundOne == false,
+ "Cannot handle union of two non-null types");
+ foundOne = true;
+ result = type;
+ }
+ }
+ return result;
+ } else {
+ return s;
+ }
+ }
+
+ protected static boolean supportsNull(Schema s) {
+ if (s.getType() == Schema.Type.NULL) {
+ return true;
+ } else if (s.getType() == Schema.Type.UNION) {
+ for (Schema type : s.getTypes()) {
+ if (type.getType() == Schema.Type.NULL) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Exception thrown when Avro conversion fails.
+ */
+ public class AvroConversionException extends Exception {
+ public AvroConversionException(String string, IOException e) {
+ super(string, e);
+ }
+
+ public AvroConversionException(String string) {
+ super(string);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
new file mode 100644
index 0000000..0d9f658
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.io.StreamCallback;
+import org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException;
+import org.apache.nifi.util.LongHolder;
+import org.kitesdk.data.DatasetException;
+import org.kitesdk.data.DatasetIOException;
+import org.kitesdk.data.SchemaNotFoundException;
+import org.kitesdk.data.spi.DefaultConfiguration;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+
+@Tags({ "avro", "convert", "kite" })
+@CapabilityDescription("Convert records from one Avro schema to another, including support for flattening and simple type conversions")
+@DynamicProperty(name = "Field name from input schema",
+value = "Field name for output schema",
+description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
+public class ConvertAvroSchema extends AbstractKiteProcessor {
+
+ private static final Relationship SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("Avro content that converted successfully").build();
+
+ private static final Relationship FAILURE = new Relationship.Builder()
+ .name("failure").description("Avro content that failed to convert")
+ .build();
+
+ /**
+ * Makes sure the output schema is a valid output schema and that all its
+ * fields can be mapped either automatically or are explicitly mapped.
+ */
+ protected static final Validator MAPPED_SCHEMA_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(String subject, String uri,
+ ValidationContext context) {
+ Configuration conf = getConfiguration(context.getProperty(
+ CONF_XML_FILES).getValue());
+ String inputUri = context.getProperty(INPUT_SCHEMA).getValue();
+ String error = null;
+
+ final boolean elPresent = context
+ .isExpressionLanguageSupported(subject)
+ && context.isExpressionLanguagePresent(uri);
+ if (!elPresent) {
+ try {
+ Schema outputSchema = getSchema(uri, conf);
+ Schema inputSchema = getSchema(inputUri, conf);
+ // Get the explicitly mapped fields. This is identical to
+ // logic in onTrigger, but ValidationContext and
+ // ProcessContext share no ancestor, so we cannot generalize
+ // the code.
+ Map<String, String> fieldMapping = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : context
+ .getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ fieldMapping.put(entry.getKey().getName(),
+ entry.getValue());
+ }
+ }
+ AvroRecordConverter converter = new AvroRecordConverter(
+ inputSchema, outputSchema, fieldMapping);
+ Collection<String> unmappedFields = converter
+ .getUnmappedFields();
+ if (unmappedFields.size() > 0) {
+ error = "The following fields are unmapped: "
+ + unmappedFields;
+ }
+
+ } catch (SchemaNotFoundException e) {
+ error = e.getMessage();
+ }
+ }
+ return new ValidationResult.Builder().subject(subject).input(uri)
+ .explanation(error).valid(error == null).build();
+ }
+ };
+
+ @VisibleForTesting
+ static final PropertyDescriptor INPUT_SCHEMA = new PropertyDescriptor.Builder()
+ .name("Input Schema").description("Avro Schema of Input Flowfiles")
+ .addValidator(SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+ .required(true).build();
+
+ @VisibleForTesting
+ static final PropertyDescriptor OUTPUT_SCHEMA = new PropertyDescriptor.Builder()
+ .name("Output Schema")
+ .description("Avro Schema of Output Flowfiles")
+ .addValidator(MAPPED_SCHEMA_VALIDATOR).expressionLanguageSupported(true)
+ .required(true).build();
+
+ private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
+ .<PropertyDescriptor> builder()
+ .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
+ .add(OUTPUT_SCHEMA).build();
+
+ private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
+ .<Relationship> builder().add(SUCCESS).add(FAILURE).build();
+
+ private static final Pattern AVRO_FIELDNAME_PATTERN = Pattern
+ .compile("[A-Za-z_][A-Za-z0-9_\\.]*");
+
+ /**
+ * Validates that the input and output fields (from dynamic properties) are
+ * all valid avro field names including "." to step into records.
+ */
+ protected static final Validator AVRO_FIELDNAME_VALIDATOR = new Validator() {
+ @Override
+ public ValidationResult validate(final String subject,
+ final String value, final ValidationContext context) {
+ if (context.isExpressionLanguageSupported(subject)
+ && context.isExpressionLanguagePresent(value)) {
+ return new ValidationResult.Builder().subject(subject)
+ .input(value)
+ .explanation("Expression Language Present").valid(true)
+ .build();
+ }
+
+ String reason = "";
+ if (!AVRO_FIELDNAME_PATTERN.matcher(subject).matches()) {
+ reason = subject + " is not a valid Avro fieldname";
+ }
+ if (!AVRO_FIELDNAME_PATTERN.matcher(value).matches()) {
+ reason = reason + value + " is not a valid Avro fieldname";
+ }
+
+ return new ValidationResult.Builder().subject(subject).input(value)
+ .explanation(reason).valid(reason.equals("")).build();
+ }
+ };
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(
+ final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .description(
+ "Field mapping between schemas. The property name is the field name for the input "
+ + "schema, and the property value is the field name for the output schema. For fields "
+ + "not listed, the processor tries to match names from the input to the output record.")
+ .dynamic(true).addValidator(AVRO_FIELDNAME_VALIDATOR).build();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return PROPERTIES;
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, final ProcessSession session)
+ throws ProcessException {
+ FlowFile incomingAvro = session.get();
+ if (incomingAvro == null) {
+ return;
+ }
+
+ String inputSchemaProperty = context.getProperty(INPUT_SCHEMA)
+ .evaluateAttributeExpressions(incomingAvro).getValue();
+ final Schema inputSchema;
+ try {
+ inputSchema = getSchema(inputSchemaProperty,
+ DefaultConfiguration.get());
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Cannot find schema: " + inputSchemaProperty);
+ session.transfer(incomingAvro, FAILURE);
+ return;
+ }
+ String outputSchemaProperty = context.getProperty(OUTPUT_SCHEMA)
+ .evaluateAttributeExpressions(incomingAvro).getValue();
+ final Schema outputSchema;
+ try {
+ outputSchema = getSchema(outputSchemaProperty,
+ DefaultConfiguration.get());
+ } catch (SchemaNotFoundException e) {
+ getLogger().error("Cannot find schema: " + outputSchemaProperty);
+ session.transfer(incomingAvro, FAILURE);
+ return;
+ }
+ final Map<String, String> fieldMapping = new HashMap<>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : context
+ .getProperties().entrySet()) {
+ if (entry.getKey().isDynamic()) {
+ fieldMapping.put(entry.getKey().getName(), entry.getValue());
+ }
+ }
+ final AvroRecordConverter converter = new AvroRecordConverter(
+ inputSchema, outputSchema, fieldMapping);
+
+ final DataFileWriter<Record> writer = new DataFileWriter<>(
+ AvroUtil.newDatumWriter(outputSchema, Record.class));
+ writer.setCodec(CodecFactory.snappyCodec());
+
+ final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
+ AvroUtil.newDatumWriter(outputSchema, Record.class));
+ failureWriter.setCodec(CodecFactory.snappyCodec());
+
+ try {
+ final LongHolder written = new LongHolder(0L);
+ final FailureTracker failures = new FailureTracker();
+
+ final List<Record> badRecords = Lists.newLinkedList();
+ FlowFile incomingAvroCopy = session.clone(incomingAvro);
+ FlowFile outgoingAvro = session.write(incomingAvro,
+ new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out)
+ throws IOException {
+ try (DataFileStream<Record> stream = new DataFileStream<Record>(
+ in, new GenericDatumReader<Record>(
+ converter.getInputSchema()))) {
+ try (DataFileWriter<Record> w = writer.create(
+ outputSchema, out)) {
+ for (Record record : stream) {
+ try {
+ Record converted = converter
+ .convert(record);
+ w.append(converted);
+ written.incrementAndGet();
+ } catch (AvroConversionException e) {
+ failures.add(e);
+ getLogger().error(
+ "Error converting data: "
+ + e.getMessage());
+ badRecords.add(record);
+ }
+ }
+ }
+ }
+ }
+ });
+
+ FlowFile badOutput = session.write(incomingAvroCopy,
+ new StreamCallback() {
+ @Override
+ public void process(InputStream in, OutputStream out)
+ throws IOException {
+
+ try (DataFileWriter<Record> w = failureWriter
+ .create(inputSchema, out)) {
+ for (Record record : badRecords) {
+ w.append(record);
+ }
+ }
+
+ }
+ });
+
+ long errors = failures.count();
+
+ // update only if file transfer is successful
+ session.adjustCounter("Converted records", written.get(), false);
+ // update only if file transfer is successful
+ session.adjustCounter("Conversion errors", errors, false);
+
+ if (written.get() > 0L) {
+ session.transfer(outgoingAvro, SUCCESS);
+ } else {
+ session.remove(outgoingAvro);
+
+ if (errors == 0L) {
+ badOutput = session.putAttribute(badOutput, "errors",
+ "No incoming records");
+ session.transfer(badOutput, FAILURE);
+ }
+ }
+
+ if (errors > 0L) {
+ getLogger().warn(
+ "Failed to convert {}/{} records between Avro Schemas",
+ new Object[] { errors, errors + written.get() });
+ badOutput = session.putAttribute(badOutput, "errors",
+ failures.summary());
+ session.transfer(badOutput, FAILURE);
+ } else {
+ session.remove(badOutput);
+ }
+ } catch (ProcessException | DatasetIOException e) {
+ getLogger().error("Failed reading or writing", e);
+ session.transfer(incomingAvro, FAILURE);
+ } catch (DatasetException e) {
+ getLogger().error("Failed to read FlowFile", e);
+ session.transfer(incomingAvro, FAILURE);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6de5612..ea99ff6 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -15,3 +15,4 @@
org.apache.nifi.processors.kite.StoreInKiteDataset
org.apache.nifi.processors.kite.ConvertCSVToAvro
org.apache.nifi.processors.kite.ConvertJSONToAvro
+org.apache.nifi.processors.kite.ConvertAvroSchema
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
new file mode 100644
index 0000000..f5d8a1d
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/resources/docs/org.apache.nifi.processors.kite.ConvertAvroSchema/additionalDetails.html
@@ -0,0 +1,142 @@
+<!DOCTYPE html>
+<html lang="en">
+ <!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ -->
+ <head>
+ <meta charset="utf-8" />
+ <title>ConvertAvroSchema</title>
+
+ <link rel="stylesheet" href="../../css/component-usage.css" type="text/css" />
+ </head>
+
+ <body>
+ <!-- Processor Documentation ================================================== -->
+ <h2>Description:</h2>
+ <p>This processor is used to convert data between two Avro formats, such as those coming from the <code>ConvertCSVToAvro</code> or
+ <code>ConvertJSONToAvro</code> processors. The input and output content of the flow files should be Avro data files. The processor
+ includes support for the following basic type conversions:
+ <ul>
+ <li>Anything to String, using the data's default String representation</li>
+ <li>String types to numeric types int, long, double, and float</li>
+ <li>Conversion to and from optional Avro types</li>
+ </ul>
+ In addition, fields can be renamed or unpacked from a record type by using the dynamic properties.
+ </p>
+ <h2>Mapping Example:</h2>
+ <p>
+ Throughout this example, we will refer to input data with the following schema:
+ <pre>
+{
+ "type": "record",
+ "name": "CustomerInput",
+ "namespace": "org.apache.example",
+ "fields": [
+ {
+ "name": "id",
+ "type": "string"
+ },
+ {
+ "name": "companyName",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "revenue",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name" : "parent",
+ "type" : [ "null", {
+ "type" : "record",
+ "name" : "parent",
+ "fields" : [ {
+ "name" : "name",
+ "type" : ["null", "string"],
+ "default" : null
+ }, {
+ "name" : "id",
+ "type" : "string"
+ } ]
+ } ],
+ "default" : null
+ }
+ ]
+}
+ </pre>
+ Where even though the revenue and id fields are mapped as string, they are logically long and double respectively.
+ By default, fields with matching names will be mapped automatically, so the following output schema could be converted
+ without using dynamic properties:
+ <pre>
+{
+ "type": "record",
+ "name": "SimpleCustomerOutput",
+ "namespace": "org.apache.example",
+ "fields": [
+ {
+ "name": "id",
+ "type": "long"
+ },
+ {
+ "name": "companyName",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "revenue",
+ "type": ["null", "double"],
+ "default": null
+ }
+ ]
+}
+ </pre>
+ To rename companyName to name and to extract the parent's id field, both a schema and a dynamic properties must be provided.
+ For example, to convert to the following schema:
+ <pre>
+{
+ "type": "record",
+ "name": "SimpleCustomerOutput",
+ "namespace": "org.apache.example",
+ "fields": [
+ {
+ "name": "id",
+ "type": "long"
+ },
+ {
+ "name": "name",
+ "type": ["null", "string"],
+ "default": null
+ },
+ {
+ "name": "revenue",
+ "type": ["null", "double"],
+ "default": null
+ },
+ {
+ "name": "parentId",
+ "type": ["null", "long"],
+ "default": null
+ }
+ ]
+}
+ </pre>
+ The following dynamic properties would be used:
+ <pre>
+"companyName" -> "name"
+"parent.id" -> "parentId"
+ </pre>
+ </p>
+</body>
+</html>
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
new file mode 100644
index 0000000..1a4748f
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestAvroRecordConverter.java
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+import java.util.Collections;
+import java.util.Map;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.generic.GenericData.Record;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class TestAvroRecordConverter {
+ final static ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+ final static Map<String, String> EMPTY_MAPPING = ImmutableMap.of();
+ final static String NESTED_RECORD_SCHEMA_STRING = "{\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"NestedInput\",\n"
+ + " \"namespace\": \"org.apache.example\",\n"
+ + " \"fields\": [\n" + " {\n"
+ + " \"name\": \"l1\",\n"
+ + " \"type\": \"long\"\n"
+ + " },\n"
+ + " {\n" + " \"name\": \"s1\",\n"
+ + " \"type\": \"string\"\n"
+ + " },\n"
+ + " {\n"
+ + " \"name\": \"parent\",\n"
+ + " \"type\": [\"null\", {\n"
+ + " \"type\": \"record\",\n"
+ + " \"name\": \"parent\",\n"
+ + " \"fields\": [\n"
+ + " { \"name\": \"id\", \"type\": \"long\" },\n"
+ + " { \"name\": \"name\", \"type\": \"string\" }\n"
+ + " ]"
+ + " } ]"
+ + " }"
+ + " ] }";
+ final static Schema NESTED_RECORD_SCHEMA = new Schema.Parser()
+ .parse(NESTED_RECORD_SCHEMA_STRING);
+ final static Schema NESTED_PARENT_SCHEMA = AvroRecordConverter
+ .getNonNullSchema(NESTED_RECORD_SCHEMA.getField("parent").schema());
+ final static Schema UNNESTED_OUTPUT_SCHEMA = SchemaBuilder.record("Output")
+ .namespace("org.apache.example").fields().requiredLong("l1")
+ .requiredLong("s1").optionalLong("parentId").endRecord();
+
+ /**
+ * Tests the case where we don't use a mapping file and just map records by
+ * name.
+ */
+ @Test
+ public void testDefaultConversion() throws Exception {
+ // We will convert s1 from string to long (or leave it null), ignore s2,
+ // convert s3 to from string to double, convert l1 from long to string,
+ // and leave l2 the same.
+ Schema input = SchemaBuilder.record("Input")
+ .namespace("com.cloudera.edh").fields()
+ .nullableString("s1", "").requiredString("s2")
+ .requiredString("s3").optionalLong("l1").requiredLong("l2")
+ .endRecord();
+ Schema output = SchemaBuilder.record("Output")
+ .namespace("com.cloudera.edh").fields().optionalLong("s1")
+ .optionalString("l1").requiredLong("l2").requiredDouble("s3")
+ .endRecord();
+
+ AvroRecordConverter converter = new AvroRecordConverter(input, output,
+ EMPTY_MAPPING);
+
+ Record inputRecord = new Record(input);
+ inputRecord.put("s1", null);
+ inputRecord.put("s2", "blah");
+ inputRecord.put("s3", "5.5");
+ inputRecord.put("l1", null);
+ inputRecord.put("l2", 5L);
+ Record outputRecord = converter.convert(inputRecord);
+ assertNull(outputRecord.get("s1"));
+ assertNull(outputRecord.get("l1"));
+ assertEquals(5L, outputRecord.get("l2"));
+ assertEquals(5.5, outputRecord.get("s3"));
+
+ inputRecord.put("s1", "500");
+ inputRecord.put("s2", "blah");
+ inputRecord.put("s3", "5.5e-5");
+ inputRecord.put("l1", 100L);
+ inputRecord.put("l2", 2L);
+ outputRecord = converter.convert(inputRecord);
+ assertEquals(500L, outputRecord.get("s1"));
+ assertEquals("100", outputRecord.get("l1"));
+ assertEquals(2L, outputRecord.get("l2"));
+ assertEquals(5.5e-5, outputRecord.get("s3"));
+ }
+
+ /**
+ * Tests the case where we want to default map one field and explicitly map
+ * another.
+ */
+ @Test
+ public void testExplicitMapping() throws Exception {
+ // We will convert s1 from string to long (or leave it null), ignore s2,
+ // convert l1 from long to string, and leave l2 the same.
+ Schema input = NESTED_RECORD_SCHEMA;
+ Schema parent = NESTED_PARENT_SCHEMA;
+ Schema output = UNNESTED_OUTPUT_SCHEMA;
+ Map<String, String> mapping = ImmutableMap.of("parent.id", "parentId");
+
+ AvroRecordConverter converter = new AvroRecordConverter(input, output,
+ mapping);
+
+ Record inputRecord = new Record(input);
+ inputRecord.put("l1", 5L);
+ inputRecord.put("s1", "1000");
+ Record parentRecord = new Record(parent);
+ parentRecord.put("id", 200L);
+ parentRecord.put("name", "parent");
+ inputRecord.put("parent", parentRecord);
+ Record outputRecord = converter.convert(inputRecord);
+ assertEquals(5L, outputRecord.get("l1"));
+ assertEquals(1000L, outputRecord.get("s1"));
+ assertEquals(200L, outputRecord.get("parentId"));
+ }
+
+ /**
+ * Tests the case where we try to convert a string to a long incorrectly.
+ */
+ @Test(expected = org.apache.nifi.processors.kite.AvroRecordConverter.AvroConversionException.class)
+ public void testIllegalConversion() throws Exception {
+ // We will convert s1 from string to long (or leave it null), ignore s2,
+ // convert l1 from long to string, and leave l2 the same.
+ Schema input = SchemaBuilder.record("Input")
+ .namespace("com.cloudera.edh").fields()
+ .nullableString("s1", "").requiredString("s2")
+ .optionalLong("l1").requiredLong("l2").endRecord();
+ Schema output = SchemaBuilder.record("Output")
+ .namespace("com.cloudera.edh").fields().optionalLong("s1")
+ .optionalString("l1").requiredLong("l2").endRecord();
+
+ AvroRecordConverter converter = new AvroRecordConverter(input, output,
+ EMPTY_MAPPING);
+
+ Record inputRecord = new Record(input);
+ inputRecord.put("s1", "blah");
+ inputRecord.put("s2", "blah");
+ inputRecord.put("l1", null);
+ inputRecord.put("l2", 5L);
+ converter.convert(inputRecord);
+ }
+
+ @Test
+ public void testGetUnmappedFields() throws Exception {
+ Schema input = SchemaBuilder.record("Input")
+ .namespace("com.cloudera.edh").fields()
+ .nullableString("s1", "").requiredString("s2")
+ .optionalLong("l1").requiredLong("l2").endRecord();
+ Schema output = SchemaBuilder.record("Output")
+ .namespace("com.cloudera.edh").fields().optionalLong("field")
+ .endRecord();
+
+ // Test the case where the field isn't mapped at all.
+ AvroRecordConverter converter = new AvroRecordConverter(input, output,
+ EMPTY_MAPPING);
+ assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+ // Test the case where we tried to map from a non-existent field.
+ converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+ "nonExistentField", "field"));
+ assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+ // Test the case where we tried to map from a non-existent record.
+ converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+ "parent.nonExistentField", "field"));
+ assertEquals(ImmutableList.of("field"), converter.getUnmappedFields());
+
+ // Test a valid case
+ converter = new AvroRecordConverter(input, output, ImmutableMap.of(
+ "l2", "field"));
+ assertEquals(Collections.EMPTY_LIST, converter.getUnmappedFields());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bb64e70e/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
new file mode 100644
index 0000000..33f3a82
--- /dev/null
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import static org.apache.nifi.processors.kite.TestUtil.streamFor;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Lists;
+
+public class TestConvertAvroSchema {
+
+ public static final Schema INPUT_SCHEMA = SchemaBuilder.record("InputTest")
+ .fields().requiredString("id").requiredString("primaryColor")
+ .optionalString("secondaryColor").optionalString("price")
+ .endRecord();
+
+ public static final Schema OUTPUT_SCHEMA = SchemaBuilder.record("Test")
+ .fields().requiredLong("id").requiredString("color")
+ .optionalDouble("price").endRecord();
+
+ public static final String MAPPING = "[{\"source\":\"primaryColor\", \"target\":\"color\"}]";
+
+ public static final String FAILURE_SUMMARY = "Cannot convert free to double";
+
+ @Test
+ public void testBasicConversion() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+ runner.assertNotValid();
+ runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+ INPUT_SCHEMA.toString());
+ runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+ OUTPUT_SCHEMA.toString());
+ runner.setProperty("primaryColor", "color");
+ runner.assertValid();
+
+ // Two valid rows, and one invalid because "free" is not a double.
+ Record goodRecord1 = dataBasic("1", "blue", null, null);
+ Record goodRecord2 = dataBasic("2", "red", "yellow", "5.5");
+ Record badRecord = dataBasic("3", "red", "yellow", "free");
+ List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
+ badRecord);
+
+ runner.enqueue(streamFor(input));
+ runner.run();
+
+ long converted = runner.getCounterValue("Converted records");
+ long errors = runner.getCounterValue("Conversion errors");
+ Assert.assertEquals("Should convert 2 rows", 2, converted);
+ Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("failure", 1);
+
+ MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+ "failure").get(0);
+ GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
+ INPUT_SCHEMA);
+ DataFileStream<Record> stream = new DataFileStream<Record>(
+ new ByteArrayInputStream(
+ runner.getContentAsByteArray(incompatible)), reader);
+ int count = 0;
+ for (Record r : stream) {
+ Assert.assertEquals(badRecord, r);
+ count++;
+ }
+ stream.close();
+ Assert.assertEquals(1, count);
+ Assert.assertEquals("Should accumulate error messages",
+ FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+ GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+ OUTPUT_SCHEMA);
+ DataFileStream<Record> successStream = new DataFileStream<Record>(
+ new ByteArrayInputStream(runner.getContentAsByteArray(runner
+ .getFlowFilesForRelationship("success").get(0))),
+ successReader);
+ count = 0;
+ for (Record r : successStream) {
+ if (count == 0) {
+ Assert.assertEquals(convertBasic(goodRecord1), r);
+ } else {
+ Assert.assertEquals(convertBasic(goodRecord2), r);
+ }
+ count++;
+ }
+ successStream.close();
+ Assert.assertEquals(2, count);
+ }
+
+ @Test
+ public void testNestedConversion() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+ runner.assertNotValid();
+ runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA,
+ TestAvroRecordConverter.NESTED_RECORD_SCHEMA.toString());
+ runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA,
+ TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA.toString());
+ runner.setProperty("parent.id", "parentId");
+ runner.assertValid();
+
+ // Two valid rows
+ Record goodRecord1 = dataNested(1L, "200", null, null);
+ Record goodRecord2 = dataNested(2L, "300", 5L, "ParentCompany");
+ List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2);
+
+ runner.enqueue(streamFor(input));
+ runner.run();
+
+ long converted = runner.getCounterValue("Converted records");
+ long errors = runner.getCounterValue("Conversion errors");
+ Assert.assertEquals("Should convert 2 rows", 2, converted);
+ Assert.assertEquals("Should reject 0 rows", 0, errors);
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("failure", 0);
+
+ GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+ TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+ DataFileStream<Record> successStream = new DataFileStream<Record>(
+ new ByteArrayInputStream(runner.getContentAsByteArray(runner
+ .getFlowFilesForRelationship("success").get(0))),
+ successReader);
+ int count = 0;
+ for (Record r : successStream) {
+ if (count == 0) {
+ Assert.assertEquals(convertNested(goodRecord1), r);
+ } else {
+ Assert.assertEquals(convertNested(goodRecord2), r);
+ }
+ count++;
+ }
+ successStream.close();
+ Assert.assertEquals(2, count);
+ }
+
+ private Record convertBasic(Record inputRecord) {
+ Record result = new Record(OUTPUT_SCHEMA);
+ result.put("id", Long.parseLong(inputRecord.get("id").toString()));
+ result.put("color", inputRecord.get("primaryColor").toString());
+ if (inputRecord.get("price") == null) {
+ result.put("price", null);
+ } else {
+ result.put("price",
+ Double.parseDouble(inputRecord.get("price").toString()));
+ }
+ return result;
+ }
+
+ private Record dataBasic(String id, String primaryColor,
+ String secondaryColor, String price) {
+ Record result = new Record(INPUT_SCHEMA);
+ result.put("id", id);
+ result.put("primaryColor", primaryColor);
+ result.put("secondaryColor", secondaryColor);
+ result.put("price", price);
+ return result;
+ }
+
+ private Record convertNested(Record inputRecord) {
+ Record result = new Record(
+ TestAvroRecordConverter.UNNESTED_OUTPUT_SCHEMA);
+ result.put("l1", inputRecord.get("l1"));
+ result.put("s1", Long.parseLong(inputRecord.get("s1").toString()));
+ if (inputRecord.get("parent") != null) {
+ // output schema doesn't have parent name.
+ result.put("parentId",
+ ((Record) inputRecord.get("parent")).get("id"));
+ }
+ return result;
+ }
+
+ private Record dataNested(long id, String companyName, Long parentId,
+ String parentName) {
+ Record result = new Record(TestAvroRecordConverter.NESTED_RECORD_SCHEMA);
+ result.put("l1", id);
+ result.put("s1", companyName);
+ if (parentId != null || parentName != null) {
+ Record parent = new Record(
+ TestAvroRecordConverter.NESTED_PARENT_SCHEMA);
+ parent.put("id", parentId);
+ parent.put("name", parentName);
+ result.put("parent", parent);
+ }
+ return result;
+ }
+}
[2/2] incubator-nifi git commit: NIFI-751 PR #70 removed extraneous
reference to abstract properties pulling in hadoop conf
Posted by jo...@apache.org.
NIFI-751 PR #70 removed extraneous reference to abstract properties pulling in hadoop conf
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/8ff69ca2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/8ff69ca2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/8ff69ca2
Branch: refs/heads/develop
Commit: 8ff69ca2d1bc5b7d651b7495f2ce45def12bebc2
Parents: bb64e70
Author: joewitt <jo...@apache.org>
Authored: Tue Jul 14 07:41:07 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Jul 14 07:41:07 2015 -0400
----------------------------------------------------------------------
.../java/org/apache/nifi/processors/kite/ConvertAvroSchema.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/8ff69ca2/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
index 0d9f658..daeb548 100644
--- a/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ b/nifi/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -140,7 +140,7 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
private static final List<PropertyDescriptor> PROPERTIES = ImmutableList
.<PropertyDescriptor> builder()
- .addAll(AbstractKiteProcessor.getProperties()).add(INPUT_SCHEMA)
+ .add(INPUT_SCHEMA)
.add(OUTPUT_SCHEMA).build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet