You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/12/03 02:18:21 UTC

nifi git commit: NIFI-1234 Augmenting container handling functionality for single Avro records and adjusting formatting in ConvertAvroToJSON.

Repository: nifi
Updated Branches:
  refs/heads/master bde270a91 -> ecc240b91


NIFI-1234 Augmenting container handling functionality for single Avro records and adjusting formatting in ConvertAvroToJSON.

Reviewed by Tony Kurc (tkurc@apache.org). This closes #136.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ecc240b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ecc240b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ecc240b9

Branch: refs/heads/master
Commit: ecc240b9181dc7a75758ed39f21ef0a416305ed1
Parents: bde270a
Author: Aldrin Piri <al...@apache.org>
Authored: Tue Dec 1 17:38:46 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Dec 2 20:16:08 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/avro/ConvertAvroToJSON.java | 56 ++++++++-----
 .../processors/avro/TestConvertAvroToJSON.java  | 86 ++++++++++++++++++++
 2 files changed, 122 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ecc240b9/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8faaa4f..d9fa4ff 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -57,13 +57,14 @@ import org.apache.nifi.processor.io.StreamCallback;
 @CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
     + "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
     + "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
-    + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects")
+    + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects.  If an incoming FlowFile does "
+    + "not contain any records, an empty JSON object is the output. Empty/Single Avro record FlowFile inputs are optionally wrapped in a container as dictated by 'Wrap Single Record'")
 @WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
 public class ConvertAvroToJSON extends AbstractProcessor {
     protected static final String CONTAINER_ARRAY = "array";
     protected static final String CONTAINER_NONE = "none";
 
-    private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
 
     static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
         .name("JSON container options")
@@ -73,6 +74,13 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         .required(true)
         .defaultValue(CONTAINER_ARRAY)
         .build();
+    static final PropertyDescriptor WRAP_SINGLE_RECORD = new PropertyDescriptor.Builder()
+        .name("Wrap Single Record")
+        .description("Determines if the resulting output for empty records or a single record should be wrapped in a container array as specified by '" + CONTAINER_OPTIONS.getName() + "'")
+        .allowableValues("true", "false")
+        .defaultValue("false")
+        .required(true)
+        .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
         .name("success")
@@ -83,7 +91,6 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         .description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
         .build();
 
-
     private List<PropertyDescriptor> properties;
 
     @Override
@@ -92,6 +99,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
 
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(CONTAINER_OPTIONS);
+        properties.add(WRAP_SINGLE_RECORD);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -116,51 +124,59 @@ public class ConvertAvroToJSON extends AbstractProcessor {
         }
 
         final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue();
+        final boolean useContainer = containerOption.equals(CONTAINER_ARRAY);
+        // Wrap a single record (inclusive of no records) only when a container is being used
+        final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
 
         try {
             flowFile = session.write(flowFile, new StreamCallback() {
                 @Override
                 public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
                     try (final InputStream in = new BufferedInputStream(rawIn);
-
-                        final OutputStream out = new BufferedOutputStream(rawOut);
-                        final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+                         final OutputStream out = new BufferedOutputStream(rawOut);
+                         final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
 
                         final GenericData genericData = GenericData.get();
 
-                        if (reader.hasNext() == false ) {
-                            out.write(EMPTY_JSON_OBJECT);
-                            return;
+                        int recordCount = 0;
+                        GenericRecord currRecord = null;
+                        if (reader.hasNext()) {
+                            currRecord = reader.next();
+                            recordCount++;
                         }
-                        int recordCount = 1;
-                        GenericRecord reuse = reader.next();
-                        // Only open container if more than one record
-                        if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){
+
+                        // Open container if desired output is an array format and there are are multiple records or
+                        // if configured to wrap single record
+                        if (reader.hasNext() && useContainer || wrapSingleRecord) {
                             out.write('[');
                         }
-                        out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
+
+                        // Determine the initial output record, inclusive if we should have an empty set of Avro records
+                        final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
+                        out.write(outputBytes);
 
                         while (reader.hasNext()) {
-                            if (containerOption.equals(CONTAINER_ARRAY)) {
+                            if (useContainer) {
                                 out.write(',');
                             } else {
                                 out.write('\n');
                             }
 
-                            reuse = reader.next(reuse);
-                            out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
+                            currRecord = reader.next(currRecord);
+                            out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
                             recordCount++;
                         }
 
-                        // Only close container if more than one record
-                        if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) {
+                        // Close container if desired output is an array format and there are multiple records or if
+                        // configured to wrap a single record
+                        if (recordCount > 1 && useContainer || wrapSingleRecord) {
                             out.write(']');
                         }
                     }
                 }
             });
         } catch (final ProcessException pe) {
-            getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe});
+            getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe});
             session.transfer(flowFile, REL_FAILURE);
             return;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ecc240b9/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index 3535156..856677a 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -54,6 +54,73 @@ public class TestConvertAvroToJSON {
     }
 
     @Test
+    public void testSingleAvroMessage_noContainer() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
+    }
+
+    @Test
+    public void testSingleAvroMessage_wrapSingleMessage() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
+        runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]");
+    }
+
+    @Test
+    public void testSingleAvroMessage_wrapSingleMessage_noContainer() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        // Verify we do not wrap output for a single record if not configured to use a container
+        runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+        runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+        final GenericRecord user1 = new GenericData.Record(schema);
+        user1.put("name", "Alyssa");
+        user1.put("favorite_number", 256);
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
+    }
+
+
+    @Test
     public void testMultipleAvroMessages() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
         final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
@@ -155,4 +222,23 @@ public class TestConvertAvroToJSON {
         out.assertContentEquals("{}");
 
     }
+
+    @Test
+    public void testZeroRecords_wrapSingleRecord() throws IOException {
+        final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+        runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+        final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+
+        final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+        final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter);
+        runner.enqueue(out1.toByteArray());
+
+        runner.run();
+
+        runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+        final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+        out.assertContentEquals("[{}]");
+
+    }
 }