You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/12/10 07:26:34 UTC

nifi git commit: NIFI-5872 - Added compression option to JsonRecordSetWriter

Repository: nifi
Updated Branches:
  refs/heads/master 60064a9f6 -> a6f91a197


NIFI-5872 - Added compression option to JsonRecordSetWriter

This closes #3208.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a6f91a19
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a6f91a19
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a6f91a19

Branch: refs/heads/master
Commit: a6f91a197516128a1809c7563efaaf3e5fe63a17
Parents: 60064a9f
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Dec 6 09:50:20 2018 +0100
Committer: Koji Kawamura <ij...@apache.org>
Committed: Mon Dec 10 16:25:59 2018 +0900

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   2 +
 .../processors/standard/TestConvertRecord.java  |  61 +++++++++-
 .../TestConvertRecord/input/person.json         |   7 ++
 .../TestConvertRecord/schema/person.avsc        |  17 +++
 .../apache/nifi/json/JsonRecordSetWriter.java   | 117 +++++++++++++++----
 .../org/apache/nifi/json/WriteJsonResult.java   |  12 +-
 6 files changed, 191 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 620c570..3ac6f19 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -519,6 +519,8 @@
                         <exclude>src/test/resources/TestForkRecord/output/split-transactions.json</exclude>
                         <exclude>src/test/resources/TestForkRecord/schema/extract-schema.avsc</exclude>
                         <exclude>src/test/resources/TestForkRecord/schema/schema.avsc</exclude>
+                        <exclude>src/test/resources/TestConvertRecord/schema/person.avsc</exclude>
+                        <exclude>src/test/resources/TestConvertRecord/input/person.json</exclude>
                     </excludes>
                 </configuration>
             </plugin>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
index 7d3f316..eba0835 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestConvertRecord.java
@@ -17,7 +17,21 @@
 
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.json.JsonTreeReader;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
 import org.apache.nifi.serialization.record.RecordFieldType;
@@ -25,8 +39,7 @@ import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
-
-import static org.junit.Assert.assertTrue;
+import org.xerial.snappy.SnappyInputStream;
 
 public class TestConvertRecord {
 
@@ -161,4 +174,48 @@ public class TestConvertRecord {
         final MockFlowFile out = runner.getFlowFilesForRelationship(ConvertRecord.REL_FAILURE).get(0);
         assertTrue(original == out);
     }
+
+    @Test
+    public void testJSONCompression() throws InitializationException, IOException {
+        final TestRunner runner = TestRunners.newTestRunner(ConvertRecord.class);
+        final JsonTreeReader jsonReader = new JsonTreeReader();
+        runner.addControllerService("reader", jsonReader);
+
+        final String inputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+        final String outputSchemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/schema/person.avsc")));
+
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonReader, SchemaAccessUtils.SCHEMA_TEXT, inputSchemaText);
+        runner.enableControllerService(jsonReader);
+
+        final JsonRecordSetWriter jsonWriter = new JsonRecordSetWriter();
+        runner.addControllerService("writer", jsonWriter);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY);
+        runner.setProperty(jsonWriter, SchemaAccessUtils.SCHEMA_TEXT, outputSchemaText);
+        runner.setProperty(jsonWriter, "Pretty Print JSON", "true");
+        runner.setProperty(jsonWriter, "Schema Write Strategy", "full-schema-attribute");
+        runner.setProperty(jsonWriter, "compression-format", "snappy");
+        runner.enableControllerService(jsonWriter);
+
+        runner.enqueue(Paths.get("src/test/resources/TestUpdateRecord/input/person.json"));
+
+        runner.setProperty(ConvertRecord.RECORD_READER, "reader");
+        runner.setProperty(ConvertRecord.RECORD_WRITER, "writer");
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(UpdateRecord.REL_SUCCESS, 1);
+
+        MockFlowFile flowFile = runner.getFlowFilesForRelationship(ExecuteSQL.REL_SUCCESS).get(0);
+        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+        try (final SnappyInputStream sis = new SnappyInputStream(new ByteArrayInputStream(flowFile.toByteArray())); final OutputStream out = baos) {
+            final byte[] buffer = new byte[8192]; int len;
+            while ((len = sis.read(buffer)) > 0) {
+                out.write(buffer, 0, len);
+            }
+            out.flush();
+        }
+
+        assertEquals(new String(Files.readAllBytes(Paths.get("src/test/resources/TestConvertRecord/input/person.json"))), baos.toString(StandardCharsets.UTF_8.name()));
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
new file mode 100644
index 0000000..e153afe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/input/person.json
@@ -0,0 +1,7 @@
+[ {
+  "id" : 485,
+  "name" : {
+    "last" : "Doe",
+    "first" : "John"
+  }
+} ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
new file mode 100644
index 0000000..82713ea
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestConvertRecord/schema/person.avsc
@@ -0,0 +1,17 @@
+{
+	"name": "personWithNameRecord",
+	"namespace": "nifi",
+	"type": "record",
+	"fields": [
+		{ "name": "id", "type": "int" },
+		{ "name": "name", "type": {
+			"type": "record",
+			"name": "nameRecord",
+			"fields": [
+					{ "name": "last", "type": "string" },
+					{ "name": "first", "type": "string" }
+				]
+			}
+		}
+	]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index e2b417d..b61586e 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -17,13 +17,15 @@
 
 package org.apache.nifi.json;
 
+import java.io.BufferedOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import org.apache.nifi.record.NullSuppression;
+import org.apache.commons.compress.compressors.CompressorException;
+import org.apache.commons.compress.compressors.CompressorStreamFactory;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnEnabled;
@@ -34,45 +36,58 @@ import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.record.NullSuppression;
 import org.apache.nifi.schema.access.SchemaNotFoundException;
 import org.apache.nifi.serialization.DateTimeTextRecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriter;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.stream.io.GZIPOutputStream;
+import org.tukaani.xz.LZMA2Options;
+import org.tukaani.xz.XZOutputStream;
+import org.xerial.snappy.SnappyFramedOutputStream;
+import org.xerial.snappy.SnappyOutputStream;
 
 @Tags({"json", "resultset", "writer", "serialize", "record", "recordset", "row"})
 @CapabilityDescription("Writes the results of a RecordSet as either a JSON Array or one JSON object per line. If using Array output, then even if the RecordSet "
-    + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
+        + "consists of a single row, it will be written as an array with a single element. If using One Line Per Object output, the JSON objects cannot be pretty-printed.")
 public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
 
     static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
-        "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
+            "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
     static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
-        "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
+            "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
     static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
-        "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
+            "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
 
     static final AllowableValue OUTPUT_ARRAY = new AllowableValue("output-array", "Array",
             "Output records as a JSON array");
     static final AllowableValue OUTPUT_ONELINE = new AllowableValue("output-oneline", "One Line Per Object",
             "Output records with one JSON object per line, delimited by a newline character");
 
+    public static final String COMPRESSION_FORMAT_GZIP = "gzip";
+    public static final String COMPRESSION_FORMAT_BZIP2 = "bzip2";
+    public static final String COMPRESSION_FORMAT_XZ_LZMA2 = "xz-lzma2";
+    public static final String COMPRESSION_FORMAT_SNAPPY = "snappy";
+    public static final String COMPRESSION_FORMAT_SNAPPY_FRAMED = "snappy framed";
+    public static final String COMPRESSION_FORMAT_NONE = "none";
+
     static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
-        .name("suppress-nulls")
-        .displayName("Suppress Null Values")
-        .description("Specifies how the writer should handle a null field")
-        .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
-        .defaultValue(NEVER_SUPPRESS.getValue())
-        .required(true)
-        .build();
+            .name("suppress-nulls")
+            .displayName("Suppress Null Values")
+            .description("Specifies how the writer should handle a null field")
+            .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
+            .defaultValue(NEVER_SUPPRESS.getValue())
+            .required(true)
+            .build();
     static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder()
-        .name("Pretty Print JSON")
-        .description("Specifies whether or not the JSON should be pretty printed")
-        .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-        .allowableValues("true", "false")
-        .defaultValue("false")
-        .required(true)
-        .build();
+            .name("Pretty Print JSON")
+            .description("Specifies whether or not the JSON should be pretty printed")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
     static final PropertyDescriptor OUTPUT_GROUPING = new PropertyDescriptor.Builder()
             .name("output-grouping")
             .displayName("Output Grouping")
@@ -82,10 +97,30 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
             .defaultValue(OUTPUT_ARRAY.getValue())
             .required(true)
             .build();
+    public static final PropertyDescriptor COMPRESSION_FORMAT = new PropertyDescriptor.Builder()
+            .name("compression-format")
+            .displayName("Compression Format")
+            .description("The compression format to use. Valid values are: GZIP, BZIP2, XZ-LZMA2, LZMA, Snappy, and Snappy Framed")
+            .allowableValues(COMPRESSION_FORMAT_NONE, COMPRESSION_FORMAT_GZIP, COMPRESSION_FORMAT_BZIP2, COMPRESSION_FORMAT_XZ_LZMA2,
+                    COMPRESSION_FORMAT_SNAPPY, COMPRESSION_FORMAT_SNAPPY_FRAMED)
+            .defaultValue(COMPRESSION_FORMAT_NONE)
+            .required(true)
+            .build();
+    public static final PropertyDescriptor COMPRESSION_LEVEL = new PropertyDescriptor.Builder()
+            .name("compression-level")
+            .displayName("Compression Level")
+            .description("The compression level to use; this is valid only when using GZIP compression. A lower value results in faster processing "
+                    + "but less compression; a value of 0 indicates no compression but simply archiving")
+            .defaultValue("1")
+            .required(true)
+            .allowableValues("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")
+            .build();
 
     private volatile boolean prettyPrint;
     private volatile NullSuppression nullSuppression;
     private volatile OutputGrouping outputGrouping;
+    private volatile String compressionFormat;
+    private volatile int compressionLevel;
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -93,6 +128,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
         properties.add(PRETTY_PRINT_JSON);
         properties.add(SUPPRESS_NULLS);
         properties.add(OUTPUT_GROUPING);
+        properties.add(COMPRESSION_FORMAT);
+        properties.add(COMPRESSION_LEVEL);
         return properties;
     }
 
@@ -130,12 +167,50 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
             grouping = OutputGrouping.OUTPUT_ARRAY;
         }
         this.outputGrouping = grouping;
+
+        this.compressionFormat = context.getProperty(COMPRESSION_FORMAT).getValue();
+        this.compressionLevel = context.getProperty(COMPRESSION_LEVEL).asInteger();
     }
 
     @Override
     public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
-        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression, outputGrouping,
-            getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
+
+        final OutputStream bufferedOut = new BufferedOutputStream(out, 65536);
+        final OutputStream compressionOut;
+        String mimeTypeRef;
+
+        try {
+            switch (compressionFormat.toLowerCase()) {
+                case COMPRESSION_FORMAT_GZIP:
+                    compressionOut = new GZIPOutputStream(bufferedOut, compressionLevel);
+                    mimeTypeRef = "application/gzip";
+                    break;
+                case COMPRESSION_FORMAT_XZ_LZMA2:
+                    compressionOut = new XZOutputStream(bufferedOut, new LZMA2Options());
+                    mimeTypeRef = "application/x-xz";
+                    break;
+                case COMPRESSION_FORMAT_SNAPPY:
+                    compressionOut = new SnappyOutputStream(bufferedOut);
+                    mimeTypeRef = "application/x-snappy";
+                    break;
+                case COMPRESSION_FORMAT_SNAPPY_FRAMED:
+                    compressionOut = new SnappyFramedOutputStream(bufferedOut);
+                    mimeTypeRef = "application/x-snappy-framed";
+                    break;
+                case COMPRESSION_FORMAT_BZIP2:
+                    mimeTypeRef = "application/x-bzip2";
+                    compressionOut = new CompressorStreamFactory().createCompressorOutputStream(compressionFormat.toLowerCase(), bufferedOut);
+                    break;
+                default:
+                    mimeTypeRef = "application/json";
+                    compressionOut = out;
+            }
+        } catch (CompressorException e) {
+            throw new IOException(e);
+        }
+
+        return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), compressionOut, prettyPrint, nullSuppression, outputGrouping,
+                getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null), mimeTypeRef);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/a6f91a19/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 5708f5e..4ce6ceb 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -60,9 +60,16 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
     private final Supplier<DateFormat> LAZY_DATE_FORMAT;
     private final Supplier<DateFormat> LAZY_TIME_FORMAT;
     private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+    private String mimeType = "application/json";
 
     public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
-        final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
+            final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
+        this(logger, recordSchema, schemaAccess, out, prettyPrint, nullSuppression, outputGrouping, dateFormat, timeFormat, timestampFormat, "application/json");
+    }
+
+    public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
+        final NullSuppression nullSuppression, final OutputGrouping outputGrouping, final String dateFormat, final String timeFormat, final String timestampFormat,
+        final String mimeType) throws IOException {
 
         super(out);
         this.logger = logger;
@@ -70,6 +77,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
         this.schemaAccess = schemaAccess;
         this.nullSuppression = nullSuppression;
         this.outputGrouping = outputGrouping;
+        this.mimeType = mimeType;
 
         final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
         final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
@@ -399,7 +407,7 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
 
     @Override
     public String getMimeType() {
-        return "application/json";
+        return this.mimeType;
     }
 
     private static interface GeneratorTask {