You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/12/10 07:26:34 UTC
nifi git commit: NIFI-5872 - Added compression option to
JsonRecordSetWriter
Repository: nifi
Updated Branches:
refs/heads/master 60064a9f6 -> a6f91a197
NIFI-5872 - Added compression option to JsonRecordSetWriter
This closes #3208.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a6f91a19
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a6f91a19
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a6f91a19
Branch: refs/heads/master
Commit: a6f91a197516128a1809c7563efaaf3e5fe63a17
Parents: 60064a9f
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Dec 6 09:50:20 2018 +0100
Committer: Koji Kawamura <ij...@apache.org>
Committed: Mon Dec 10 16:25:59 2018 +0900
----------------------------------------------------------------------
.../nifi-standard-processors/pom.xml | 2 +
.../processors/standard/TestConvertRecord.java | 61 +++++++++-
.../TestConvertRecord/input/person.json | 7 ++
.../TestConvertRecord/schema/person.avsc | 17 +++
.../apache/nifi/json/JsonRecordSetWriter.java | 117 +++++++++++++++----
.../org/apache/nifi/json/WriteJsonResult.java | 12 +-
6 files changed, 191 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 620c570..3ac6f19 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -519,6 +519,8 @@
<exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
<exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude>
<exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
+ <exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
+ <exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
</excludes>
</configuration>
</plugin>
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 7d3f316..eba0835 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -17,7 +17,21 @@
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordParser;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.serialization.record.RecordFieldType;
@@ -25,8 +39,7 @@ import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
+import org.xerial.snappy.SnappyInputStream;
public class TestConvertRecord {
@@ -161,4 +174,48 @@ public class TestConvertRecord {
final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
assertTrue(original == out);
}
+
+ @Test
+ public void testJSONCompression() throws InitializationException, IOException {
+ final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
+ final JsonTreeReader jsonReader = new JsonTreeReader();
+ runner.addControllerService("reader", jsonReader);
+
+ final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+ final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
+ runner.enableControllerService(jsonReader);
+
+ final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+ runner.addControllerService("writer", jsonWriter);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+ runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+ runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+ runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
+ runner.setProperty(jsonWriter, "compression-format", "snappy");
+ runner.enableControllerService(jsonWriter);
+
+ runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
+
+ runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+ runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+ runner.run();
+ runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+
+ MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) {
+ final byte[] buffer = new byte[8192]; int len;
+ while ((len = sis.read(buffer)) > 0) {
+ out.write(buffer, 0, len);
+ }
+ out.flush();
+ }
+
+ assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name()));
+ }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
new file mode 100644
index 0000000..e153afe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
@@ -0,0 +1,7 @@
+[ {
+ "id" : 485,
+ "name" : {
+ "last" : "Doe",
+ "first" : "John"
+ }
+} ]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
new file mode 100644
index 0000000..82713ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
@@ -0,0 +1,17 @@
+{
+ "name": "personWithNameRecord",
+ "namespace": "nifi",
+ "type": "record",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "name", "type": {
+ "type": "record",
+ "name": "nameRecord",
+ "fields": [
+ { "name": "last", "type": "string" },
+ { "name": "first", "type": "string" }
+ ]
+ }
+ }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index e2b417d..b61586e 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -17,13 +17,15 @@
package org.apache.nifi.json;
+import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import org.apache.nifi.record.NullSuppression;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -34,45 +36,58 @@ import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.record.NullSuppression;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriter;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.tukaani.xz.LZMA2Options;
+import org.tukaani.xz.XZOutputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+import org.xerial.snappy.SnappyOutputStream;
@Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
@CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
- + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
+ + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
- "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
+ "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
- "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
+ "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
- "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
+ "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
static final AllowableValue OUTPUT_ARRAY = new AllowableValue("output-array", "Array",
"Output records as a JSON array");
static final AllowableValue OUTPUT_ONELINE = new AllowableValue("output-oneline", "One Line Per Object",
"Output records with one JSON object per line, delimited by a newline character");
+ public static final String COMPRESSION_FORMAT_GZIP = "gzip";
+ public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
+ public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
+ public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
+ public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
+ public static final String COMPRESSION_FORMAT_NONE = "none";
+
static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
- .name("suppress-nulls")
- .displayName("Suppress Null Values")
- .description("Specifies how the writer should handle a null field")
- .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
- .defaultValue(NEVER_SUPPRESS.getValue())
- .required(true)
- .build();
+ .name("suppress-nulls")
+ .displayName("Suppress Null Values")
+ .description("Specifies how the writer should handle a null field")
+ .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
+ .defaultValue(NEVER_SUPPRESS.getValue())
+ .required(true)
+ .build();
static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder()
- .name("Pretty Print JSON")
- .description("Specifies whether or not the JSON should be pretty printed")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
+ .name("Pretty Print JSON")
+ .description("Specifies whether or not the JSON should be pretty printed")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder()
.name("output-grouping")
.displayName("Output Grouping")
@@ -82,10 +97,30 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
.defaultValue(OUTPUT_ARRAY.getValue())
.required(true)
.build();
+ public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
+ .name("compression-format")
+ .displayName("Compression Format")
+ .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+ .allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
+ COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
+ .defaultValue(COMPRESSION_FORMAT_NONE)
+ .required(true)
+ .build();
+ public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
+ .name("compression-level")
+ .displayName("Compression Level")
+ .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing "
+ + "but less compression; a value of 0 indicates no compression but simply archiving")
+ .defaultValue("1")
+ .required(true)
+ .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+ .build();
private volatile boolean prettyPrint;
private volatile NullSuppression nullSuppression;
private volatile OutputGrouping outputGrouping;
+ private volatile String compressionFormat;
+ private volatile int compressionLevel;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -93,6 +128,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
properties.add(PRETTY_PRINT_JSON);
properties.add(SUPPRESS_NULLS);
properties.add(OUTPUT_GROUPING);
+ properties.add(COMPRESSION_FORMAT);
+ properties.add(COMPRESSION_LEVEL);
return properties;
}
@@ -130,12 +167,50 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
grouping = OutputGrouping.OUTPUT_ARRAY;
}
this.outputGrouping = grouping;
+
+ this.compressionFormat = context.getProperty(COMPRESSION_FORMAT).getValue();
+ this.compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
- return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping,
- getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
+
+ final OutputStream bufferedOut = new BufferedOutputStream(out, 65536);
+ final OutputStream compressionOut;
+ String mimeTypeRef;
+
+ try {
+ switch (compressionFormat.toLowerCase()) {
+ case COMPRESSION_FORMAT_GZIP:
+ compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
+ mimeTypeRef = "application/gzip";
+ break;
+ case COMPRESSION_FORMAT_XZ_LZMA2:
+ compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
+ mimeTypeRef = "application/x-xz";
+ break;
+ case COMPRESSION_FORMAT_SNAPPY:
+ compressionOut = new SnappyOutputStream(bufferedOut);
+ mimeTypeRef = "application/x-snappy";
+ break;
+ case COMPRESSION_FORMAT_SNAPPY_FRAMED:
+ compressionOut = new SnappyFramedOutputStream(bufferedOut);
+ mimeTypeRef = "application/x-snappy-framed";
+ break;
+ case COMPRESSION_FORMAT_BZIP2:
+ mimeTypeRef = "application/x-bzip2";
+ compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
+ break;
+ default:
+ mimeTypeRef = "application/json";
+ compressionOut = out;
+ }
+ } catch (CompressorException e) {
+ throw new IOException(e);
+ }
+
+ return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
+ getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 5708f5e..4ce6ceb 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -60,9 +60,16 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+ private String mimeType = "application/json";
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
- final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
+ final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
+ this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
+ }
+
+ public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
+ final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
+ final String mimeType) throws IOException {
super(out);
this.logger = logger;
@@ -70,6 +77,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
this.schemaAccess = schemaAccess;
this.nullSuppression = nullSuppression;
this.outputGrouping = outputGrouping;
+ this.mimeType = mimeType;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
@@ -399,7 +407,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
@Override
public String getMimeType() {
- return "application/json";
+ return this.mimeType;
}
private static interface GeneratorTask {