You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by af...@apache.org on 2016/10/05 13:21:45 UTC
nifi git commit: NIFI-1840 Added compression type property in Kite
processors
Repository: nifi
Updated Branches:
refs/heads/master 2c907c63a -> 75d0c74d2
NIFI-1840 Added compression type property in Kite processors
This closes #409
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75d0c74d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75d0c74d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75d0c74d
Branch: refs/heads/master
Commit: 75d0c74d273600629d7a6e7027196c39b66513bb
Parents: 2c907c6
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue May 3 17:00:49 2016 +0200
Committer: Andre F de Miranda <tr...@users.noreply.github.com>
Committed: Thu Oct 6 00:20:52 2016 +1100
----------------------------------------------------------------------
.../kite/AbstractKiteConvertProcessor.java | 62 ++++++++++++++++++
.../nifi/processors/kite/ConvertAvroSchema.java | 22 +++++--
.../nifi/processors/kite/ConvertCSVToAvro.java | 6 +-
.../nifi/processors/kite/ConvertJSONToAvro.java | 12 +++-
.../processors/kite/TestCSVToAvroProcessor.java | 31 ++++++++-
.../processors/kite/TestConvertAvroSchema.java | 68 ++++++++++++++++++++
.../kite/TestJSONToAvroProcessor.java | 30 +++++++++
7 files changed, 219 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
new file mode 100644
index 0000000..561bf46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.nifi.components.PropertyDescriptor;
+
+import com.google.common.annotations.VisibleForTesting;
+
+abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor {
+
+ @VisibleForTesting
+ static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
+ .name("kite-compression-type")
+ .displayName("Compression type")
+ .description("Compression type to use when writting Avro files. Default is Snappy.")
+ .allowableValues(CodecType.values())
+ .defaultValue(CodecType.SNAPPY.toString())
+ .build();
+
+ public enum CodecType {
+ BZIP2,
+ DEFLATE,
+ NONE,
+ SNAPPY,
+ LZO
+ }
+
+ protected CodecFactory getCodecFactory(String property) {
+ CodecType type = CodecType.valueOf(property);
+ switch (type) {
+ case BZIP2:
+ return CodecFactory.bzip2Codec();
+ case DEFLATE:
+ return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
+ case NONE:
+ return CodecFactory.nullCodec();
+ case LZO:
+ return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
+ case SNAPPY:
+ default:
+ return CodecFactory.snappyCodec();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
index a8244d2..a3fffc3 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -30,7 +30,6 @@ import java.util.Set;
import java.util.regex.Pattern;
import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
@@ -70,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong;
@DynamicProperty(name = "Field name from input schema",
value = "Field name for output schema",
description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
-public class ConvertAvroSchema extends AbstractKiteProcessor {
+public class ConvertAvroSchema extends AbstractKiteConvertProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
@@ -180,7 +179,9 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
.<PropertyDescriptor> builder()
.add(INPUT_SCHEMA)
.add(OUTPUT_SCHEMA)
- .add(LOCALE).build();
+ .add(LOCALE)
+ .add(COMPRESSION_TYPE)
+ .build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
.<Relationship> builder().add(SUCCESS).add(FAILURE).build();
@@ -284,11 +285,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(outputSchema, Record.class));
- writer.setCodec(CodecFactory.snappyCodec());
+ writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
AvroUtil.newDatumWriter(outputSchema, Record.class));
- failureWriter.setCodec(CodecFactory.snappyCodec());
+ failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);
@@ -376,6 +377,17 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(incomingAvro, FAILURE);
+ } finally {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ getLogger().warn("Unable to close writer ressource", e);
+ }
+ try {
+ failureWriter.close();
+ } catch (IOException e) {
+ getLogger().warn("Unable to close writer ressource", e);
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index de4130f..22244ee 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -27,7 +27,6 @@ import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.commons.lang3.StringEscapeUtils;
@@ -63,7 +62,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "csv", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
-public class ConvertCSVToAvro extends AbstractKiteProcessor {
+public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
@@ -164,6 +163,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
.add(ESCAPE)
.add(HAS_HEADER)
.add(LINES_TO_SKIP)
+ .add(COMPRESSION_TYPE)
.build();
private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
@@ -221,7 +221,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
}
try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
- writer.setCodec(CodecFactory.snappyCodec());
+ writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 6245362..1127a2d 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -25,7 +25,6 @@ import java.util.List;
import java.util.Set;
import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData.Record;
import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -54,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
@Tags({"kite", "json", "avro"})
@InputRequirement(Requirement.INPUT_REQUIRED)
@CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
-public class ConvertJSONToAvro extends AbstractKiteProcessor {
+public class ConvertJSONToAvro extends AbstractKiteConvertProcessor {
private static final Relationship SUCCESS = new Relationship.Builder()
.name("success")
@@ -85,6 +84,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
= ImmutableList.<PropertyDescriptor>builder()
.addAll(AbstractKiteProcessor.getProperties())
.add(SCHEMA)
+ .add(COMPRESSION_TYPE)
.build();
private static final Set<Relationship> RELATIONSHIPS
@@ -129,7 +129,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
final DataFileWriter<Record> writer = new DataFileWriter<>(
AvroUtil.newDatumWriter(schema, Record.class));
- writer.setCodec(CodecFactory.snappyCodec());
+ writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
try {
final AtomicLong written = new AtomicLong(0L);
@@ -200,6 +200,12 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
} catch (DatasetException e) {
getLogger().error("Failed to read FlowFile", e);
session.transfer(incomingJSON, FAILURE);
+ } finally {
+ try {
+ writer.close();
+ } catch (IOException e) {
+ getLogger().warn("Unable to close writer ressource", e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 902ec79..9252e81 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -89,7 +90,6 @@ public class TestCSVToAvroProcessor {
FAILURE_SUMMARY, incompatible.getAttribute("errors"));
}
-
@Test
public void testBasicConversion() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
@@ -119,6 +119,35 @@ public class TestCSVToAvroProcessor {
}
@Test
+ public void testBasicConversionWithCompression() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+ runner.assertNotValid();
+ runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+ runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString());
+ runner.assertValid();
+
+ runner.enqueue(streamFor(CSV_CONTENT));
+ runner.run();
+
+ long converted = runner.getCounterValue("Converted records");
+ long errors = runner.getCounterValue("Conversion errors");
+ Assert.assertEquals("Should convert 2 rows", 2, converted);
+ Assert.assertEquals("Should reject 1 row", 1, errors);
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("failure", 0);
+ runner.assertTransferCount("incompatible", 1);
+
+ MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
+ String failureContent = new String(runner.getContentAsByteArray(incompatible),
+ StandardCharsets.UTF_8);
+ Assert.assertEquals("Should reject an invalid string and double",
+ CSV_CONTENT, failureContent);
+ Assert.assertEquals("Should accumulate error messages",
+ FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+ }
+
+ @Test
public void testAlternateCharset() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
index 2da0513..7a62ac5 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
@@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.commons.lang.LocaleUtils;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -125,6 +126,73 @@ public class TestConvertAvroSchema {
}
@Test
+ public void testBasicConversionWithCompression() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+ runner.assertNotValid();
+ runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString());
+ runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString());
+ runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString());
+ Locale locale = Locale.getDefault();
+ runner.setProperty("primaryColor", "color");
+ runner.assertValid();
+
+ NumberFormat format = NumberFormat.getInstance(locale);
+
+ // Two valid rows, and one invalid because "free" is not a double.
+ Record goodRecord1 = dataBasic("1", "blue", null, null);
+ Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
+ Record badRecord = dataBasic("3", "red", "yellow", "free");
+ List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
+ badRecord);
+
+ runner.enqueue(streamFor(input));
+ runner.run();
+
+ long converted = runner.getCounterValue("Converted records");
+ long errors = runner.getCounterValue("Conversion errors");
+ Assert.assertEquals("Should convert 2 rows", 2, converted);
+ Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("failure", 1);
+
+ MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+ "failure").get(0);
+ GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
+ INPUT_SCHEMA);
+ DataFileStream<Record> stream = new DataFileStream<Record>(
+ new ByteArrayInputStream(
+ runner.getContentAsByteArray(incompatible)), reader);
+ int count = 0;
+ for (Record r : stream) {
+ Assert.assertEquals(badRecord, r);
+ count++;
+ }
+ stream.close();
+ Assert.assertEquals(1, count);
+ Assert.assertEquals("Should accumulate error messages",
+ FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+ GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+ OUTPUT_SCHEMA);
+ DataFileStream<Record> successStream = new DataFileStream<Record>(
+ new ByteArrayInputStream(runner.getContentAsByteArray(runner
+ .getFlowFilesForRelationship("success").get(0))),
+ successReader);
+ count = 0;
+ for (Record r : successStream) {
+ if (count == 0) {
+ Assert.assertEquals(convertBasic(goodRecord1, locale), r);
+ } else {
+ Assert.assertEquals(convertBasic(goodRecord2, locale), r);
+ }
+ count++;
+ }
+ successStream.close();
+ Assert.assertEquals(2, count);
+ }
+
+ @Test
public void testBasicConversionWithLocales() throws IOException {
testBasicConversionWithLocale("en_US");
testBasicConversionWithLocale("fr_FR");
http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
index e0b4a6f..776e2f3 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
@@ -84,6 +85,35 @@ public class TestJSONToAvroProcessor {
}
@Test
+ public void testBasicConversionWithCompression() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+ runner.assertNotValid();
+ runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+ runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString());
+ runner.assertValid();
+
+ runner.enqueue(streamFor(JSON_CONTENT));
+ runner.run();
+
+ long converted = runner.getCounterValue("Converted records");
+ long errors = runner.getCounterValue("Conversion errors");
+ Assert.assertEquals("Should convert 2 rows", 2, converted);
+ Assert.assertEquals("Should reject 3 rows", 3, errors);
+
+ runner.assertTransferCount("success", 1);
+ runner.assertTransferCount("failure", 0);
+ runner.assertTransferCount("incompatible", 1);
+
+ MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
+ String failureContent = new String(runner.getContentAsByteArray(incompatible),
+ StandardCharsets.UTF_8);
+ Assert.assertEquals("Should reject an invalid string and double",
+ JSON_CONTENT, failureContent);
+ Assert.assertEquals("Should accumulate error messages",
+ FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+ }
+
+ @Test
public void testOnlyErrors() throws IOException {
TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
runner.assertNotValid();