You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2017/09/12 19:50:28 UTC
nifi git commit: NIFI-4378: Allow suppression of null values in JSON
Repository: nifi
Updated Branches:
refs/heads/master 7423b41e3 -> b452f8c25
NIFI-4378: Allow suppression of null values in JSON
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #2148.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/b452f8c2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/b452f8c2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/b452f8c2
Branch: refs/heads/master
Commit: b452f8c251a01b52fbab25d05ff9208f46c2b8b5
Parents: 7423b41
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Sep 12 10:47:50 2017 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Tue Sep 12 21:41:59 2017 +0200
----------------------------------------------------------------------
.../apache/nifi/json/JsonRecordSetWriter.java | 32 ++++++-
.../org/apache/nifi/json/NullSuppression.java | 24 +++++
.../org/apache/nifi/json/WriteJsonResult.java | 30 +++++-
.../apache/nifi/json/TestWriteJsonResult.java | 97 +++++++++++++++++---
4 files changed, 167 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/b452f8c2/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
index cbe2f59..2ec9b26 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSetWriter.java
@@ -25,6 +25,7 @@ import java.util.List;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
@@ -39,6 +40,21 @@ import org.apache.nifi.serialization.record.RecordSchema;
+ "consists of a single row, it will be written as an array with a single element.")
public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements RecordSetWriterFactory {
+ static final AllowableValue ALWAYS_SUPPRESS = new AllowableValue("always-suppress", "Always Suppress",
+ "Fields that are missing (present in the schema but not in the record), or that have a value of null, will not be written out");
+ static final AllowableValue NEVER_SUPPRESS = new AllowableValue("never-suppress", "Never Suppress",
+ "Fields that are missing (present in the schema but not in the record), or that have a value of null, will be written out as a null value");
+ static final AllowableValue SUPPRESS_MISSING = new AllowableValue("suppress-missing", "Suppress Missing Values",
+ "When a field has a value of null, it will be written out. However, if a field is defined in the schema and not present in the record, the field will not be written out.");
+
+ static final PropertyDescriptor SUPPRESS_NULLS = new PropertyDescriptor.Builder()
+ .name("suppress-nulls")
+ .displayName("Suppress Null Values")
+ .description("Specifies how the writer should handle a null field")
+ .allowableValues(NEVER_SUPPRESS, ALWAYS_SUPPRESS, SUPPRESS_MISSING)
+ .defaultValue(NEVER_SUPPRESS.getValue())
+ .required(true)
+ .build();
static final PropertyDescriptor PRETTY_PRINT_JSON = new PropertyDescriptor.Builder()
.name("Pretty Print JSON")
.description("Specifies whether or not the JSON should be pretty printed")
@@ -48,7 +64,8 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
.required(true)
.build();
- private boolean prettyPrint;
+ private volatile boolean prettyPrint;
+ private volatile NullSuppression nullSuppression;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -60,11 +77,22 @@ public class JsonRecordSetWriter extends DateTimeTextRecordSetWriter implements
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
prettyPrint = context.getProperty(PRETTY_PRINT_JSON).asBoolean();
+
+ final NullSuppression suppression;
+ final String suppressNullValue = context.getProperty(SUPPRESS_NULLS).getValue();
+ if (ALWAYS_SUPPRESS.getValue().equals(suppressNullValue)) {
+ suppression = NullSuppression.ALWAYS_SUPPRESS;
+ } else if (SUPPRESS_MISSING.getValue().equals(suppressNullValue)) {
+ suppression = NullSuppression.SUPPRESS_MISSING;
+ } else {
+ suppression = NullSuppression.NEVER_SUPPRESS;
+ }
+ this.nullSuppression = suppression;
}
@Override
public RecordSetWriter createWriter(final ComponentLog logger, final RecordSchema schema, final OutputStream out) throws SchemaNotFoundException, IOException {
- return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint,
+ return new WriteJsonResult(logger, schema, getSchemaAccessWriter(schema), out, prettyPrint, nullSuppression,
getDateFormat().orElse(null), getTimeFormat().orElse(null), getTimestampFormat().orElse(null));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b452f8c2/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/NullSuppression.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/NullSuppression.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/NullSuppression.java
new file mode 100644
index 0000000..494c72f
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/NullSuppression.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.nifi.json;
+
+public enum NullSuppression {
+ ALWAYS_SUPPRESS,
+ NEVER_SUPPRESS,
+ SUPPRESS_MISSING
+}
http://git-wip-us.apache.org/repos/asf/nifi/blob/b452f8c2/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
index 8f3bf22..5cfd3ac 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/WriteJsonResult.java
@@ -23,6 +23,7 @@ import java.math.BigInteger;
import java.text.DateFormat;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Supplier;
import org.apache.nifi.logging.ComponentLog;
@@ -52,17 +53,19 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
private final RecordSchema recordSchema;
private final JsonFactory factory = new JsonFactory();
private final JsonGenerator generator;
+ private final NullSuppression nullSuppression;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
public WriteJsonResult(final ComponentLog logger, final RecordSchema recordSchema, final SchemaAccessWriter schemaAccess, final OutputStream out, final boolean prettyPrint,
- final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
+ final NullSuppression nullSuppression, final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException {
super(out);
this.logger = logger;
this.recordSchema = recordSchema;
this.schemaAccess = schemaAccess;
+ this.nullSuppression = nullSuppression;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
@@ -159,7 +162,10 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
final String fieldName = field.getFieldName();
final Object value = record.getValue(field);
if (value == null) {
- generator.writeNullField(fieldName);
+ if (nullSuppression == NullSuppression.NEVER_SUPPRESS || (nullSuppression == NullSuppression.SUPPRESS_MISSING) && isFieldPresent(field, record)) {
+ generator.writeNullField(fieldName);
+ }
+
continue;
}
@@ -172,7 +178,10 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
for (final String fieldName : record.getRawFieldNames()) {
final Object value = record.getValue(fieldName);
if (value == null) {
- generator.writeNullField(fieldName);
+ if (nullSuppression == NullSuppression.NEVER_SUPPRESS || (nullSuppression == NullSuppression.SUPPRESS_MISSING) && record.getRawFieldNames().contains(fieldName)) {
+ generator.writeNullField(fieldName);
+ }
+
continue;
}
@@ -190,6 +199,21 @@ public class WriteJsonResult extends AbstractRecordSetWriter implements RecordSe
}
}
+ private boolean isFieldPresent(final RecordField field, final Record record) {
+ final Set<String> rawFieldNames = record.getRawFieldNames();
+ if (rawFieldNames.contains(field.getFieldName())) {
+ return true;
+ }
+
+ for (final String alias : field.getAliases()) {
+ if (rawFieldNames.contains(alias)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
@SuppressWarnings("unchecked")
private void writeRawValue(final JsonGenerator generator, final Object value, final String fieldName)
throws JsonGenerationException, IOException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/b452f8c2/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 2dbf146..af19394 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -103,8 +103,8 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, valueMap);
final RecordSet rs = RecordSet.of(schema, record);
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
- RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true,
+ NullSuppression.NEVER_SUPPRESS, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
writer.write(rs);
}
@@ -140,8 +140,8 @@ public class TestWriteJsonResult {
final RecordSet rs = RecordSet.of(schema, record1, record2);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true, RecordFieldType.DATE.getDefaultFormat(),
- RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, true,
+ NullSuppression.NEVER_SUPPRESS, RecordFieldType.DATE.getDefaultFormat(), RecordFieldType.TIME.getDefaultFormat(), RecordFieldType.TIMESTAMP.getDefaultFormat())) {
writer.write(rs);
}
@@ -172,7 +172,7 @@ public class TestWriteJsonResult {
final RecordSet rs = RecordSet.of(schema, record);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.write(rs);
}
@@ -196,7 +196,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
@@ -222,7 +222,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
@@ -248,7 +248,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
@@ -274,7 +274,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
@@ -301,7 +301,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRecord(record);
writer.finishRecordSet();
@@ -328,7 +328,7 @@ public class TestWriteJsonResult {
final Record record = new MapRecord(schema, values);
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
- try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, null, null, null)) {
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
writer.beginRecordSet();
writer.writeRawRecord(record);
writer.finishRecordSet();
@@ -341,4 +341,79 @@ public class TestWriteJsonResult {
final String output = new String(data, StandardCharsets.UTF_8);
assertEquals(expected, output);
}
+
+ @Test
+ public void testNullSuppression() throws IOException {
+ final List<RecordField> fields = new ArrayList<>();
+ fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
+ fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
+ final RecordSchema schema = new SimpleRecordSchema(fields);
+
+ final Map<String, Object> values = new LinkedHashMap<>();
+ values.put("id", "1");
+ final Record recordWithMissingName = new MapRecord(schema, values);
+
+ final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithMissingName);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\",\"name\":null}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ baos.reset();
+ try (
+ final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, null, null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithMissingName);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ baos.reset();
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, null,
+ null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithMissingName);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ // set an explicit null value
+ values.put("name", null);
+ final Record recordWithNullValue = new MapRecord(schema, values);
+
+ baos.reset();
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.NEVER_SUPPRESS, null, null, null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithNullValue);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\",\"name\":null}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ baos.reset();
+ try (
+ final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.ALWAYS_SUPPRESS, null, null, null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithNullValue);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\"}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ baos.reset();
+ try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false, NullSuppression.SUPPRESS_MISSING, null, null,
+ null)) {
+ writer.beginRecordSet();
+ writer.write(recordWithNullValue);
+ writer.finishRecordSet();
+ }
+
+ assertEquals("[{\"id\":\"1\",\"name\":null}]", new String(baos.toByteArray(), StandardCharsets.UTF_8));
+
+ }
}