You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by af...@apache.org on 2016/10/05 13:21:45 UTC

nifi git commit: NIFI-1840 Added compression type property in Kite processors

Repository: nifi
Updated Branches:
  refs/heads/master 2c907c63a -> 75d0c74d2


NIFI-1840 Added compression type property in Kite processors

This closes #409


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/75d0c74d
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/75d0c74d
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/75d0c74d

Branch: refs/heads/master
Commit: 75d0c74d273600629d7a6e7027196c39b66513bb
Parents: 2c907c6
Author: Pierre Villard <pi...@gmail.com>
Authored: Tue May 3 17:00:49 2016 +0200
Committer: Andre F de Miranda <tr...@users.noreply.github.com>
Committed: Thu Oct 6 00:20:52 2016 +1100

----------------------------------------------------------------------
 .../kite/AbstractKiteConvertProcessor.java      | 62 ++++++++++++++++++
 .../nifi/processors/kite/ConvertAvroSchema.java | 22 +++++--
 .../nifi/processors/kite/ConvertCSVToAvro.java  |  6 +-
 .../nifi/processors/kite/ConvertJSONToAvro.java | 12 +++-
 .../processors/kite/TestCSVToAvroProcessor.java | 31 ++++++++-
 .../processors/kite/TestConvertAvroSchema.java  | 68 ++++++++++++++++++++
 .../kite/TestJSONToAvroProcessor.java           | 30 +++++++++
 7 files changed, 219 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
new file mode 100644
index 0000000..561bf46
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/AbstractKiteConvertProcessor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.nifi.processors.kite;
+
+import org.apache.avro.file.CodecFactory;
+import org.apache.nifi.components.PropertyDescriptor;
+
+import com.google.common.annotations.VisibleForTesting;
+
+abstract class AbstractKiteConvertProcessor extends AbstractKiteProcessor {
+
+    @VisibleForTesting
+    static final PropertyDescriptor COMPRESSION_TYPE = new PropertyDescriptor.Builder()
+            .name("kite-compression-type")
+            .displayName("Compression type")
+            .description("Compression type to use when writting Avro files. Default is Snappy.")
+            .allowableValues(CodecType.values())
+            .defaultValue(CodecType.SNAPPY.toString())
+            .build();
+
+    public enum CodecType {
+        BZIP2,
+        DEFLATE,
+        NONE,
+        SNAPPY,
+        LZO
+    }
+
+    protected CodecFactory getCodecFactory(String property) {
+        CodecType type = CodecType.valueOf(property);
+        switch (type) {
+        case BZIP2:
+            return CodecFactory.bzip2Codec();
+        case DEFLATE:
+            return CodecFactory.deflateCodec(CodecFactory.DEFAULT_DEFLATE_LEVEL);
+        case NONE:
+            return CodecFactory.nullCodec();
+        case LZO:
+            return CodecFactory.xzCodec(CodecFactory.DEFAULT_XZ_LEVEL);
+        case SNAPPY:
+        default:
+            return CodecFactory.snappyCodec();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
index a8244d2..a3fffc3 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertAvroSchema.java
@@ -30,7 +30,6 @@ import java.util.Set;
 import java.util.regex.Pattern;
 
 import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileStream;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
@@ -70,7 +69,7 @@ import java.util.concurrent.atomic.AtomicLong;
 @DynamicProperty(name = "Field name from input schema",
 value = "Field name for output schema",
 description = "Explicit mappings from input schema to output schema, which supports renaming fields and stepping into nested records on the input schema using notation like parent.id")
-public class ConvertAvroSchema extends AbstractKiteProcessor {
+public class ConvertAvroSchema extends AbstractKiteConvertProcessor {
 
     private static final Relationship SUCCESS = new Relationship.Builder()
             .name("success")
@@ -180,7 +179,9 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
             .<PropertyDescriptor> builder()
             .add(INPUT_SCHEMA)
             .add(OUTPUT_SCHEMA)
-            .add(LOCALE).build();
+            .add(LOCALE)
+            .add(COMPRESSION_TYPE)
+            .build();
 
     private static final Set<Relationship> RELATIONSHIPS = ImmutableSet
             .<Relationship> builder().add(SUCCESS).add(FAILURE).build();
@@ -284,11 +285,11 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
 
         final DataFileWriter<Record> writer = new DataFileWriter<>(
                 AvroUtil.newDatumWriter(outputSchema, Record.class));
-        writer.setCodec(CodecFactory.snappyCodec());
+        writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
 
         final DataFileWriter<Record> failureWriter = new DataFileWriter<>(
                 AvroUtil.newDatumWriter(outputSchema, Record.class));
-        failureWriter.setCodec(CodecFactory.snappyCodec());
+        failureWriter.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
 
         try {
             final AtomicLong written = new AtomicLong(0L);
@@ -376,6 +377,17 @@ public class ConvertAvroSchema extends AbstractKiteProcessor {
         } catch (DatasetException e) {
             getLogger().error("Failed to read FlowFile", e);
             session.transfer(incomingAvro, FAILURE);
+        } finally {
+            try {
+                writer.close();
+            } catch (IOException e) {
+                getLogger().warn("Unable to close writer ressource", e);
+            }
+            try {
+                failureWriter.close();
+            } catch (IOException e) {
+                getLogger().warn("Unable to close writer ressource", e);
+            }
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
index de4130f..22244ee 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertCSVToAvro.java
@@ -27,7 +27,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.commons.lang3.StringEscapeUtils;
@@ -63,7 +62,7 @@ import java.util.concurrent.atomic.AtomicLong;
 @Tags({"kite", "csv", "avro"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Converts CSV files to Avro according to an Avro Schema")
-public class ConvertCSVToAvro extends AbstractKiteProcessor {
+public class ConvertCSVToAvro extends AbstractKiteConvertProcessor {
 
     private static final CSVProperties DEFAULTS = new CSVProperties.Builder().build();
 
@@ -164,6 +163,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
         .add(ESCAPE)
         .add(HAS_HEADER)
         .add(LINES_TO_SKIP)
+        .add(COMPRESSION_TYPE)
         .build();
 
     private static final Set<Relationship> RELATIONSHIPS = ImmutableSet.<Relationship> builder()
@@ -221,7 +221,7 @@ public class ConvertCSVToAvro extends AbstractKiteProcessor {
         }
 
         try (final DataFileWriter<Record> writer = new DataFileWriter<>(AvroUtil.newDatumWriter(schema, Record.class))) {
-            writer.setCodec(CodecFactory.snappyCodec());
+            writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
 
             try {
                 final AtomicLong written = new AtomicLong(0L);

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
index 6245362..1127a2d 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/main/java/org/apache/nifi/processors/kite/ConvertJSONToAvro.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.avro.Schema;
-import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -54,7 +53,7 @@ import java.util.concurrent.atomic.AtomicLong;
 @Tags({"kite", "json", "avro"})
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Converts JSON files to Avro according to an Avro Schema")
-public class ConvertJSONToAvro extends AbstractKiteProcessor {
+public class ConvertJSONToAvro extends AbstractKiteConvertProcessor {
 
     private static final Relationship SUCCESS = new Relationship.Builder()
             .name("success")
@@ -85,6 +84,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
             = ImmutableList.<PropertyDescriptor>builder()
             .addAll(AbstractKiteProcessor.getProperties())
             .add(SCHEMA)
+            .add(COMPRESSION_TYPE)
             .build();
 
     private static final Set<Relationship> RELATIONSHIPS
@@ -129,7 +129,7 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
 
         final DataFileWriter<Record> writer = new DataFileWriter<>(
                 AvroUtil.newDatumWriter(schema, Record.class));
-        writer.setCodec(CodecFactory.snappyCodec());
+        writer.setCodec(getCodecFactory(context.getProperty(COMPRESSION_TYPE).getValue()));
 
         try {
             final AtomicLong written = new AtomicLong(0L);
@@ -200,6 +200,12 @@ public class ConvertJSONToAvro extends AbstractKiteProcessor {
         } catch (DatasetException e) {
             getLogger().error("Failed to read FlowFile", e);
             session.transfer(incomingJSON, FAILURE);
+        } finally {
+            try {
+                writer.close();
+            } catch (IOException e) {
+                getLogger().warn("Unable to close writer ressource", e);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
index 902ec79..9252e81 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestCSVToAvroProcessor.java
@@ -24,6 +24,7 @@ import java.nio.charset.StandardCharsets;
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -89,7 +90,6 @@ public class TestCSVToAvroProcessor {
                 FAILURE_SUMMARY, incompatible.getAttribute("errors"));
     }
 
-
     @Test
     public void testBasicConversion() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
@@ -119,6 +119,35 @@ public class TestCSVToAvroProcessor {
     }
 
     @Test
+    public void testBasicConversionWithCompression() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());
+        runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.DEFLATE.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(CSV_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 row", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
+        String failureContent = new String(runner.getContentAsByteArray(incompatible),
+                StandardCharsets.UTF_8);
+        Assert.assertEquals("Should reject an invalid string and double",
+                CSV_CONTENT, failureContent);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
     public void testAlternateCharset() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertCSVToAvro.class);
         runner.setProperty(ConvertCSVToAvro.SCHEMA, SCHEMA.toString());

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
index 2da0513..7a62ac5 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestConvertAvroSchema.java
@@ -33,6 +33,7 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.commons.lang.LocaleUtils;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -125,6 +126,73 @@ public class TestConvertAvroSchema {
     }
 
     @Test
+    public void testBasicConversionWithCompression() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertAvroSchema.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertAvroSchema.INPUT_SCHEMA, INPUT_SCHEMA.toString());
+        runner.setProperty(ConvertAvroSchema.OUTPUT_SCHEMA, OUTPUT_SCHEMA.toString());
+        runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.BZIP2.toString());
+        Locale locale = Locale.getDefault();
+        runner.setProperty("primaryColor", "color");
+        runner.assertValid();
+
+        NumberFormat format = NumberFormat.getInstance(locale);
+
+        // Two valid rows, and one invalid because "free" is not a double.
+        Record goodRecord1 = dataBasic("1", "blue", null, null);
+        Record goodRecord2 = dataBasic("2", "red", "yellow", format.format(5.5));
+        Record badRecord = dataBasic("3", "red", "yellow", "free");
+        List<Record> input = Lists.newArrayList(goodRecord1, goodRecord2,
+                badRecord);
+
+        runner.enqueue(streamFor(input));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 1 rows", 1, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship(
+                "failure").get(0);
+        GenericDatumReader<Record> reader = new GenericDatumReader<Record>(
+                INPUT_SCHEMA);
+        DataFileStream<Record> stream = new DataFileStream<Record>(
+                new ByteArrayInputStream(
+                        runner.getContentAsByteArray(incompatible)), reader);
+        int count = 0;
+        for (Record r : stream) {
+            Assert.assertEquals(badRecord, r);
+            count++;
+        }
+        stream.close();
+        Assert.assertEquals(1, count);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+
+        GenericDatumReader<Record> successReader = new GenericDatumReader<Record>(
+                OUTPUT_SCHEMA);
+        DataFileStream<Record> successStream = new DataFileStream<Record>(
+                new ByteArrayInputStream(runner.getContentAsByteArray(runner
+                        .getFlowFilesForRelationship("success").get(0))),
+                successReader);
+        count = 0;
+        for (Record r : successStream) {
+            if (count == 0) {
+                Assert.assertEquals(convertBasic(goodRecord1, locale), r);
+            } else {
+                Assert.assertEquals(convertBasic(goodRecord2, locale), r);
+            }
+            count++;
+        }
+        successStream.close();
+        Assert.assertEquals(2, count);
+    }
+
+    @Test
     public void testBasicConversionWithLocales() throws IOException {
         testBasicConversionWithLocale("en_US");
         testBasicConversionWithLocale("fr_FR");

http://git-wip-us.apache.org/repos/asf/nifi/blob/75d0c74d/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
index e0b4a6f..776e2f3 100644
--- a/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
+++ b/nifi-nar-bundles/nifi-kite-bundle/nifi-kite-processors/src/test/java/org/apache/nifi/processors/kite/TestJSONToAvroProcessor.java
@@ -23,6 +23,7 @@ import java.nio.charset.StandardCharsets;
 
 import org.apache.avro.Schema;
 import org.apache.avro.SchemaBuilder;
+import org.apache.nifi.processors.kite.AbstractKiteConvertProcessor.CodecType;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
@@ -84,6 +85,35 @@ public class TestJSONToAvroProcessor {
     }
 
     @Test
+    public void testBasicConversionWithCompression() throws IOException {
+        TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
+        runner.assertNotValid();
+        runner.setProperty(ConvertJSONToAvro.SCHEMA, SCHEMA.toString());
+        runner.setProperty(AbstractKiteConvertProcessor.COMPRESSION_TYPE, CodecType.NONE.toString());
+        runner.assertValid();
+
+        runner.enqueue(streamFor(JSON_CONTENT));
+        runner.run();
+
+        long converted = runner.getCounterValue("Converted records");
+        long errors = runner.getCounterValue("Conversion errors");
+        Assert.assertEquals("Should convert 2 rows", 2, converted);
+        Assert.assertEquals("Should reject 3 rows", 3, errors);
+
+        runner.assertTransferCount("success", 1);
+        runner.assertTransferCount("failure", 0);
+        runner.assertTransferCount("incompatible", 1);
+
+        MockFlowFile incompatible = runner.getFlowFilesForRelationship("incompatible").get(0);
+        String failureContent = new String(runner.getContentAsByteArray(incompatible),
+                StandardCharsets.UTF_8);
+        Assert.assertEquals("Should reject an invalid string and double",
+                JSON_CONTENT, failureContent);
+        Assert.assertEquals("Should accumulate error messages",
+                FAILURE_SUMMARY, incompatible.getAttribute("errors"));
+    }
+
+    @Test
     public void testOnlyErrors() throws IOException {
         TestRunner runner = TestRunners.newTestRunner(ConvertJSONToAvro.class);
         runner.assertNotValid();