You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tk...@apache.org on 2015/12/03 02:18:21 UTC
nifi git commit: NIFI-1234 Augmenting container handling
functionality for single Avro records and adjusting formatting in
ConvertAvroToJSON.
Repository: nifi
Updated Branches:
refs/heads/master bde270a91 -> ecc240b91
NIFI-1234 Augmenting container handling functionality for single Avro records and adjusting formatting in ConvertAvroToJSON.
Reviewed by Tony Kurc (tkurc@apache.org). This closes #136.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ecc240b9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ecc240b9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ecc240b9
Branch: refs/heads/master
Commit: ecc240b9181dc7a75758ed39f21ef0a416305ed1
Parents: bde270a
Author: Aldrin Piri <al...@apache.org>
Authored: Tue Dec 1 17:38:46 2015 -0500
Committer: Tony Kurc <tr...@gmail.com>
Committed: Wed Dec 2 20:16:08 2015 -0500
----------------------------------------------------------------------
.../nifi/processors/avro/ConvertAvroToJSON.java | 56 ++++++++-----
.../processors/avro/TestConvertAvroToJSON.java | 86 ++++++++++++++++++++
2 files changed, 122 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ecc240b9/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
index 8faaa4f..d9fa4ff 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/ConvertAvroToJSON.java
@@ -57,13 +57,14 @@ import org.apache.nifi.processor.io.StreamCallback;
@CapabilityDescription("Converts a Binary Avro record into a JSON object. This processor provides a direct mapping of an Avro field to a JSON field, such "
+ "that the resulting JSON will have the same hierarchical structure as the Avro document. Note that the Avro schema information will be lost, as this "
+ "is not a translation from binary Avro to JSON formatted Avro. The output JSON is encoded the UTF-8 encoding. If an incoming FlowFile contains a stream of "
- + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects")
+ + "multiple Avro records, the resultant FlowFile will contain a JSON Array containing all of the Avro records or a sequence of JSON Objects. If an incoming FlowFile does "
+ + "not contain any records, an empty JSON object is the output. Empty/Single Avro record FlowFile inputs are optionally wrapped in a container as dictated by 'Wrap Single Record'")
@WritesAttribute(attribute = "mime.type", description = "Sets the mime type to application/json")
public class ConvertAvroToJSON extends AbstractProcessor {
protected static final String CONTAINER_ARRAY = "array";
protected static final String CONTAINER_NONE = "none";
- private static final byte [] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
+ private static final byte[] EMPTY_JSON_OBJECT = "{}".getBytes(StandardCharsets.UTF_8);
static final PropertyDescriptor CONTAINER_OPTIONS = new PropertyDescriptor.Builder()
.name("JSON container options")
@@ -73,6 +74,13 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.required(true)
.defaultValue(CONTAINER_ARRAY)
.build();
+ static final PropertyDescriptor WRAP_SINGLE_RECORD = new PropertyDescriptor.Builder()
+ .name("Wrap Single Record")
+ .description("Determines if the resulting output for empty records or a single record should be wrapped in a container array as specified by '" + CONTAINER_OPTIONS.getName() + "'")
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
@@ -83,7 +91,6 @@ public class ConvertAvroToJSON extends AbstractProcessor {
.description("A FlowFile is routed to this relationship if it cannot be parsed as Avro or cannot be converted to JSON for any reason")
.build();
-
private List<PropertyDescriptor> properties;
@Override
@@ -92,6 +99,7 @@ public class ConvertAvroToJSON extends AbstractProcessor {
final List<PropertyDescriptor> properties = new ArrayList<>();
properties.add(CONTAINER_OPTIONS);
+ properties.add(WRAP_SINGLE_RECORD);
this.properties = Collections.unmodifiableList(properties);
}
@@ -116,51 +124,59 @@ public class ConvertAvroToJSON extends AbstractProcessor {
}
final String containerOption = context.getProperty(CONTAINER_OPTIONS).getValue();
+ final boolean useContainer = containerOption.equals(CONTAINER_ARRAY);
+ // Wrap a single record (inclusive of no records) only when a container is being used
+ final boolean wrapSingleRecord = context.getProperty(WRAP_SINGLE_RECORD).asBoolean() && useContainer;
try {
flowFile = session.write(flowFile, new StreamCallback() {
@Override
public void process(final InputStream rawIn, final OutputStream rawOut) throws IOException {
try (final InputStream in = new BufferedInputStream(rawIn);
-
- final OutputStream out = new BufferedOutputStream(rawOut);
- final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
+ final OutputStream out = new BufferedOutputStream(rawOut);
+ final DataFileStream<GenericRecord> reader = new DataFileStream<>(in, new GenericDatumReader<GenericRecord>())) {
final GenericData genericData = GenericData.get();
- if (reader.hasNext() == false ) {
- out.write(EMPTY_JSON_OBJECT);
- return;
+ int recordCount = 0;
+ GenericRecord currRecord = null;
+ if (reader.hasNext()) {
+ currRecord = reader.next();
+ recordCount++;
}
- int recordCount = 1;
- GenericRecord reuse = reader.next();
- // Only open container if more than one record
- if(reader.hasNext() && containerOption.equals(CONTAINER_ARRAY)){
+
+ // Open container if desired output is an array format and there are are multiple records or
+ // if configured to wrap single record
+ if (reader.hasNext() && useContainer || wrapSingleRecord) {
out.write('[');
}
- out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
+
+ // Determine the initial output record, inclusive if we should have an empty set of Avro records
+ final byte[] outputBytes = (currRecord == null) ? EMPTY_JSON_OBJECT : genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8);
+ out.write(outputBytes);
while (reader.hasNext()) {
- if (containerOption.equals(CONTAINER_ARRAY)) {
+ if (useContainer) {
out.write(',');
} else {
out.write('\n');
}
- reuse = reader.next(reuse);
- out.write(genericData.toString(reuse).getBytes(StandardCharsets.UTF_8));
+ currRecord = reader.next(currRecord);
+ out.write(genericData.toString(currRecord).getBytes(StandardCharsets.UTF_8));
recordCount++;
}
- // Only close container if more than one record
- if (recordCount > 1 && containerOption.equals(CONTAINER_ARRAY)) {
+ // Close container if desired output is an array format and there are multiple records or if
+ // configured to wrap a single record
+ if (recordCount > 1 && useContainer || wrapSingleRecord) {
out.write(']');
}
}
}
});
} catch (final ProcessException pe) {
- getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[] {flowFile, pe});
+ getLogger().error("Failed to convert {} from Avro to JSON due to {}; transferring to failure", new Object[]{flowFile, pe});
session.transfer(flowFile, REL_FAILURE);
return;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ecc240b9/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
index 3535156..856677a 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestConvertAvroToJSON.java
@@ -54,6 +54,73 @@ public class TestConvertAvroToJSON {
}
@Test
+ public void testSingleAvroMessage_noContainer() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+ runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+ final GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("name", "Alyssa");
+ user1.put("favorite_number", 256);
+
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+ runner.enqueue(out1.toByteArray());
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+ out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
+ }
+
+ @Test
+ public void testSingleAvroMessage_wrapSingleMessage() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+ runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_ARRAY);
+ runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+ final GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("name", "Alyssa");
+ user1.put("favorite_number", 256);
+
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+ runner.enqueue(out1.toByteArray());
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+ out.assertContentEquals("[{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}]");
+ }
+
+ @Test
+ public void testSingleAvroMessage_wrapSingleMessage_noContainer() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+ // Verify we do not wrap output for a single record if not configured to use a container
+ runner.setProperty(ConvertAvroToJSON.CONTAINER_OPTIONS, ConvertAvroToJSON.CONTAINER_NONE);
+ runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+ final GenericRecord user1 = new GenericData.Record(schema);
+ user1.put("name", "Alyssa");
+ user1.put("favorite_number", 256);
+
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ final ByteArrayOutputStream out1 = AvroTestUtil.serializeAvroRecord(schema, datumWriter, user1);
+ runner.enqueue(out1.toByteArray());
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+ out.assertContentEquals("{\"name\": \"Alyssa\", \"favorite_number\": 256, \"favorite_color\": null}");
+ }
+
+
+ @Test
public void testMultipleAvroMessages() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
@@ -155,4 +222,23 @@ public class TestConvertAvroToJSON {
out.assertContentEquals("{}");
}
+
+ @Test
+ public void testZeroRecords_wrapSingleRecord() throws IOException {
+ final TestRunner runner = TestRunners.newTestRunner(new ConvertAvroToJSON());
+ runner.setProperty(ConvertAvroToJSON.WRAP_SINGLE_RECORD, Boolean.toString(true));
+ final Schema schema = new Schema.Parser().parse(new File("src/test/resources/user.avsc"));
+
+
+ final DatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<>(schema);
+ final ByteArrayOutputStream out1 = serializeAvroRecord(schema, datumWriter);
+ runner.enqueue(out1.toByteArray());
+
+ runner.run();
+
+ runner.assertAllFlowFilesTransferred(ConvertAvroToJSON.REL_SUCCESS, 1);
+ final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertAvroToJSON.REL_SUCCESS).get(0);
+ out.assertContentEquals("[{}]");
+
+ }
}