You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2023/10/09 16:05:18 UTC
[nifi] branch support/nifi-1.x updated: NIFI-12153 Added Allow Comments and Max String Length to JSON Readers
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch support/nifi-1.x
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/support/nifi-1.x by this push:
new acddeb1cf4 NIFI-12153 Added Allow Comments and Max String Length to JSON Readers
acddeb1cf4 is described below
commit acddeb1cf463b2c43e4b0f0968445ac6f5365528
Author: Chris Sampson <ch...@gmail.com>
AuthorDate: Sat Sep 30 18:32:28 2023 +0100
NIFI-12153 Added Allow Comments and Max String Length to JSON Readers
This closes #7823
Signed-off-by: David Handermann <ex...@apache.org>
(cherry picked from commit 099ceec7ede8d0427f588bf71aec7b3e006a2a4b)
---
.../nifi/json/AbstractJsonRowRecordReader.java | 83 +++++++++++++++++-----
.../apache/nifi/json/JsonPathRowRecordReader.java | 15 +++-
.../apache/nifi/json/JsonTreeRowRecordReader.java | 31 +++++++-
.../standard/AbstractJsonPathProcessor.java | 2 +-
.../processors/standard/JoltTransformJSON.java | 2 +-
.../nifi-record-serialization-services/pom.xml | 6 ++
.../java/org/apache/nifi/json/JsonPathReader.java | 16 +++--
.../java/org/apache/nifi/json/JsonTreeReader.java | 12 +++-
.../nifi/json/TestJsonTreeRowRecordReader.java | 30 +++++++-
.../resources/json/bank-account-comments.jsonc | 24 +++++++
10 files changed, 186 insertions(+), 35 deletions(-)
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 59f12770e9..d718e7e4bc 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -17,14 +17,17 @@
package org.apache.nifi.json;
-import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.SimpleRecordSchema;
@@ -51,16 +54,37 @@ import java.util.function.BiPredicate;
import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
+ public static final String DEFAULT_MAX_STRING_LENGTH = "20 MB";
+
+ static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
+ .name("Max String Length")
+ .displayName("Max String Length")
+ .description("The maximum allowed length of a string value when parsing the JSON document")
+ .required(true)
+ .defaultValue(DEFAULT_MAX_STRING_LENGTH)
+ .addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor ALLOW_COMMENTS = new PropertyDescriptor.Builder()
+ .name("Allow Comments")
+ .displayName("Allow Comments")
+ .description("Whether to allow comments when parsing the JSON document")
+ .required(true)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+ .build();
+
+ private static final StreamReadConstraints DEFAULT_STREAM_READ_CONSTRAINTS = StreamReadConstraints.builder()
+ .maxStringLength(DataUnit.parseDataSize(DEFAULT_MAX_STRING_LENGTH, DataUnit.B).intValue())
+ .build();
private final ComponentLog logger;
- private final Supplier<DateFormat> LAZY_DATE_FORMAT;
- private final Supplier<DateFormat> LAZY_TIME_FORMAT;
- private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
+ private final Supplier<DateFormat> lazyDateFormat;
+ private final Supplier<DateFormat> lazyTimeFormat;
+ private final Supplier<DateFormat> lazyTimestampFormat;
private boolean firstObjectConsumed = false;
-
- private static final JsonFactory jsonFactory = new JsonFactory();
- private static final ObjectMapper codec = new ObjectMapper();
private JsonParser jsonParser;
private JsonNode firstJsonNode;
private StartingFieldStrategy strategy;
@@ -75,9 +99,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final DateFormat tf = timeFormat == null ? null : DataTypeUtils.getDateFormat(timeFormat);
final DateFormat tsf = timestampFormat == null ? null : DataTypeUtils.getDateFormat(timestampFormat);
- LAZY_DATE_FORMAT = () -> df;
- LAZY_TIME_FORMAT = () -> tf;
- LAZY_TIMESTAMP_FORMAT = () -> tsf;
+ lazyDateFormat = () -> df;
+ lazyTimeFormat = () -> tf;
+ lazyTimestampFormat = () -> tsf;
}
protected AbstractJsonRowRecordReader(final InputStream in,
@@ -87,7 +111,19 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final String timestampFormat)
throws IOException, MalformedRecordException {
- this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null);
+ this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, false, null);
+ }
+
+ protected AbstractJsonRowRecordReader(final InputStream in,
+ final ComponentLog logger,
+ final String dateFormat,
+ final String timeFormat,
+ final String timestampFormat,
+ final boolean allowComments,
+ final StreamReadConstraints streamReadConstraints)
+ throws IOException, MalformedRecordException {
+
+ this(in, logger, dateFormat, timeFormat, timestampFormat, null, null, null, allowComments, streamReadConstraints);
}
/**
@@ -100,8 +136,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
* @param timestampFormat format for parsing timestamp fields
* @param strategy whether to start processing from a specific field
* @param nestedFieldName the name of the field to start the processing from
- * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
+ * @param captureFieldPredicate predicate that takes a JSON fieldName and fieldValue to capture top-level non-processed fields which can
* be accessed by calling {@link #getCapturedFields()}
+ * @param allowComments whether to allow comments within the JSON stream
+ * @param streamReadConstraints configuration for the JsonFactory stream reader {@link StreamReadConstraints}
+ *
* @throws IOException in case of JSON stream processing failure
* @throws MalformedRecordException in case of malformed JSON input
*/
@@ -112,7 +151,9 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final String timestampFormat,
final StartingFieldStrategy strategy,
final String nestedFieldName,
- final BiPredicate<String, String> captureFieldPredicate)
+ final BiPredicate<String, String> captureFieldPredicate,
+ final boolean allowComments,
+ final StreamReadConstraints streamReadConstraints)
throws IOException, MalformedRecordException {
this(logger, dateFormat, timeFormat, timestampFormat);
@@ -122,7 +163,13 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
capturedFields = new LinkedHashMap<>();
try {
- jsonParser = jsonFactory.createParser(in);
+ final ObjectMapper codec = new ObjectMapper();
+ if (allowComments) {
+ codec.enable(JsonParser.Feature.ALLOW_COMMENTS);
+ }
+ codec.getFactory().setStreamReadConstraints(streamReadConstraints != null ? streamReadConstraints : DEFAULT_STREAM_READ_CONSTRAINTS);
+
+ jsonParser = codec.getFactory().createParser(in);
jsonParser.setCodec(codec);
if (strategy == StartingFieldStrategy.NESTED_FIELD) {
@@ -152,15 +199,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
protected Supplier<DateFormat> getLazyDateFormat() {
- return LAZY_DATE_FORMAT;
+ return lazyDateFormat;
}
protected Supplier<DateFormat> getLazyTimeFormat() {
- return LAZY_TIME_FORMAT;
+ return lazyTimeFormat;
}
protected Supplier<DateFormat> getLazyTimestampFormat() {
- return LAZY_TIMESTAMP_FORMAT;
+ return lazyTimestampFormat;
}
@@ -219,7 +266,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
case TIME:
case TIMESTAMP:
try {
- return DataTypeUtils.convertType(textValue, dataType, LAZY_DATE_FORMAT, LAZY_TIME_FORMAT, LAZY_TIMESTAMP_FORMAT, fieldName);
+ return DataTypeUtils.convertType(textValue, dataType, lazyDateFormat, lazyTimeFormat, lazyTimestampFormat, fieldName);
} catch (final Exception e) {
return textValue;
}
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
index 21ff1c3920..328cf0c868 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonPathRowRecordReader.java
@@ -17,6 +17,7 @@
package org.apache.nifi.json;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.DocumentContext;
@@ -52,12 +53,20 @@ public class JsonPathRowRecordReader extends AbstractJsonRowRecordReader {
private final ComponentLog logger;
private final LinkedHashMap<String, JsonPath> jsonPaths;
private final InputStream in;
- private RecordSchema schema;
+ private final RecordSchema schema;
public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
- final String dateFormat, final String timeFormat, final String timestampFormat)
+ final String dateFormat, final String timeFormat, final String timestampFormat)
+ throws MalformedRecordException, IOException {
+ this(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, false, null);
+ }
+
+ public JsonPathRowRecordReader(final LinkedHashMap<String, JsonPath> jsonPaths, final RecordSchema schema, final InputStream in, final ComponentLog logger,
+ final String dateFormat, final String timeFormat, final String timestampFormat,
+ final boolean allowComments, final StreamReadConstraints streamReadConstraints)
throws MalformedRecordException, IOException {
- super(in, logger, dateFormat, timeFormat, timestampFormat);
+
+ super(in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
this.schema = schema;
this.jsonPaths = jsonPaths;
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index fce50e35c9..758cd96a48 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-record-utils/nifi-json-record-utils/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -17,6 +17,7 @@
package org.apache.nifi.json;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ArrayNode;
import org.apache.nifi.logging.ComponentLog;
@@ -52,8 +53,19 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
private final RecordSchema schema;
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
- final String dateFormat, final String timeFormat, final String timestampFormat) throws IOException, MalformedRecordException {
- this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null);
+ final String dateFormat, final String timeFormat, final String timestampFormat)
+ throws IOException, MalformedRecordException {
+
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat, false, null);
+ }
+
+ public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
+ final String dateFormat, final String timeFormat, final String timestampFormat,
+ final boolean allowComments, final StreamReadConstraints streamReadConstraints)
+ throws IOException, MalformedRecordException {
+
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat, null, null, null, null,
+ allowComments, streamReadConstraints);
}
public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
@@ -62,7 +74,20 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate)
throws IOException, MalformedRecordException {
- super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate);
+ this(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, schemaApplicationStrategy,
+ captureFieldPredicate, false, null);
+ }
+
+ public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
+ final String dateFormat, final String timeFormat, final String timestampFormat,
+ final StartingFieldStrategy startingFieldStrategy, final String startingFieldName,
+ final SchemaApplicationStrategy schemaApplicationStrategy, final BiPredicate<String, String> captureFieldPredicate,
+ final boolean allowComments, final StreamReadConstraints streamReadConstraints)
+ throws IOException, MalformedRecordException {
+
+ super(in, logger, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName, captureFieldPredicate,
+ allowComments, streamReadConstraints);
+
if (startingFieldStrategy == StartingFieldStrategy.NESTED_FIELD && schemaApplicationStrategy == SchemaApplicationStrategy.WHOLE_JSON) {
this.schema = getSelectedSchema(schema, startingFieldName);
} else {
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
index aa72654b99..cc89fe98d3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/AbstractJsonPathProcessor.java
@@ -72,7 +72,7 @@ public abstract class AbstractJsonPathProcessor extends AbstractProcessor {
.build();
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
- .name("max-string-length")
+ .name("Max String Length")
.displayName("Max String Length")
.description("The maximum allowed length of a string value when parsing the JSON document")
.required(true)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
index bfeb2556db..feac10a5ff 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/JoltTransformJSON.java
@@ -158,7 +158,7 @@ public class JoltTransformJSON extends AbstractProcessor {
.build();
public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
- .name("max-string-length")
+ .name("Max String Length")
.displayName("Max String Length")
.description("The maximum allowed length of a string value when parsing the JSON document")
.required(true)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index b174c2df65..e5064f1bfb 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -187,14 +187,17 @@
<exclude>src/test/resources/csv/multi-bank-account_escapechar.csv</exclude>
<exclude>src/test/resources/csv/multi-bank-account_spec_delimiter.csv</exclude>
<exclude>src/test/resources/csv/prov-events.csv</exclude>
+
<exclude>src/test/resources/grok/error-with-stack-trace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample-multiline-with-stacktrace.log</exclude>
<exclude>src/test/resources/grok/nifi-log-sample.log</exclude>
<exclude>src/test/resources/grok/single-line-log-messages.txt</exclude>
<exclude>src/test/resources/grok/grok_patterns.txt</exclude>
+
<exclude>src/test/resources/json/bank-account-array-different-schemas.json</exclude>
<exclude>src/test/resources/json/bank-account-array-optional-balance.json</exclude>
<exclude>src/test/resources/json/bank-account-array.json</exclude>
+ <exclude>src/test/resources/json/bank-account-comments.jsonc</exclude>
<exclude>src/test/resources/json/bank-account-mixed.json</exclude>
<exclude>src/test/resources/json/bank-account-multiarray.json</exclude>
<exclude>src/test/resources/json/bank-account-multiline.json</exclude>
@@ -227,11 +230,14 @@
<exclude>src/test/resources/json/choice-of-string-or-array-record.avsc</exclude>
<exclude>src/test/resources/json/nested-choice-of-empty-array-or-string.json</exclude>
<exclude>src/test/resources/json/nested-choice-of-record-array-or-string.json</exclude>
+
<exclude>src/test/resources/syslog/syslog5424/log.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_all.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix.txt</exclude>
<exclude>src/test/resources/syslog/syslog5424/log_mix_in_error.txt</exclude>
+
<exclude>src/test/resources/text/testschema</exclude>
+
<exclude>src/test/resources/xml/field_with_sub-element.xml</exclude>
<exclude>src/test/resources/xml/people.xml</exclude>
<exclude>src/test/resources/xml/people2.xml</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
index caca7c2d72..297400c3d4 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonPathReader.java
@@ -17,6 +17,7 @@
package org.apache.nifi.json;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import com.jayway.jsonpath.JsonPath;
import org.apache.nifi.annotation.behavior.DynamicProperty;
@@ -32,6 +33,7 @@ import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.schema.inference.RecordSourceFactory;
@@ -69,23 +71,24 @@ import java.util.function.Supplier;
description="User-defined properties identify how to extract specific fields from a JSON object in order to create a Record",
expressionLanguageScope=ExpressionLanguageScope.NONE)
public class JsonPathReader extends SchemaRegistryService implements RecordReaderFactory {
-
-
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
private volatile LinkedHashMap<String, JsonPath> jsonPaths;
+ private volatile boolean allowComments;
+ private volatile StreamReadConstraints streamReadConstraints;
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
+ properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
+ properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
return properties;
}
-
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
return new PropertyDescriptor.Builder()
@@ -103,6 +106,10 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+ final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+ this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+ this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
+
final LinkedHashMap<String, JsonPath> compiled = new LinkedHashMap<>();
for (final PropertyDescriptor descriptor : context.getProperties().keySet()) {
if (!descriptor.isDynamic()) {
@@ -164,7 +171,6 @@ public class JsonPathReader extends SchemaRegistryService implements RecordReade
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat);
+ return new JsonPathRowRecordReader(jsonPaths, schema, in, logger, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints);
}
-
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index 1d4caa8dbc..bcf7d66c95 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -17,6 +17,7 @@
package org.apache.nifi.json;
+import com.fasterxml.jackson.core.StreamReadConstraints;
import com.fasterxml.jackson.databind.JsonNode;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
@@ -27,6 +28,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
@@ -68,13 +70,14 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+ "See the Usage of the Controller Service for more information and examples.")
@SeeAlso(JsonPathReader.class)
public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
-
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
private volatile String startingFieldName;
private volatile StartingFieldStrategy startingFieldStrategy;
private volatile SchemaApplicationStrategy schemaApplicationStrategy;
+ private volatile boolean allowComments;
+ private volatile StreamReadConstraints streamReadConstraints;
public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
.name("starting-field-strategy")
@@ -119,6 +122,8 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
properties.add(STARTING_FIELD_STRATEGY);
properties.add(STARTING_FIELD_NAME);
properties.add(SCHEMA_APPLICATION_STRATEGY);
+ properties.add(AbstractJsonRowRecordReader.MAX_STRING_LENGTH);
+ properties.add(AbstractJsonRowRecordReader.ALLOW_COMMENTS);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@@ -133,6 +138,9 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
this.schemaApplicationStrategy = SchemaApplicationStrategy.valueOf(context.getProperty(SCHEMA_APPLICATION_STRATEGY).getValue());
+ final int maxStringLength = context.getProperty(AbstractJsonRowRecordReader.MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
+ this.streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
+ this.allowComments = context.getProperty(AbstractJsonRowRecordReader.ALLOW_COMMENTS).asBoolean();
}
@Override
@@ -165,6 +173,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName,
- schemaApplicationStrategy, null);
+ schemaApplicationStrategy, null, allowComments, streamReadConstraints);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index 1f77aa6318..7c7f715eff 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -17,6 +17,8 @@
package org.apache.nifi.json;
+import com.fasterxml.jackson.core.StreamReadConstraints;
+import com.fasterxml.jackson.core.exc.StreamConstraintsException;
import org.apache.avro.Schema;
import org.apache.commons.io.FileUtils;
import org.apache.nifi.avro.AvroTypeUtil;
@@ -296,11 +298,35 @@ class TestJsonTreeRowRecordReader {
@Test
void testReadMultilineJSON() throws IOException, MalformedRecordException {
+ testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, null);
+ }
+
+ @Test
+ void testReadJSONStringTooLong() {
+ final StreamConstraintsException mre = assertThrows(StreamConstraintsException.class, () ->
+ testReadAccountJson("src/test/resources/json/bank-account-multiline.json", false, StreamReadConstraints.builder().maxStringLength(2).build()));
+ assertTrue(mre.getMessage().contains("maximum length"));
+ assertTrue(mre.getMessage().contains("(2)"));
+ }
+
+ @Test
+ void testReadJSONComments() throws IOException, MalformedRecordException {
+ testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", true, StreamReadConstraints.builder().maxStringLength(20_000).build());
+ }
+
+ @Test
+ void testReadJSONDisallowComments() {
+ final MalformedRecordException mre = assertThrows(MalformedRecordException.class, () ->
+ testReadAccountJson("src/test/resources/json/bank-account-comments.jsonc", false, StreamReadConstraints.builder().maxStringLength(20_000).build()));
+ assertTrue(mre.getMessage().contains("not parse"));
+ }
+
+ private void testReadAccountJson(final String inputFile, final boolean allowComments, final StreamReadConstraints streamReadConstraints) throws IOException, MalformedRecordException {
final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json");
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream(inputFile);
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat, allowComments, streamReadConstraints)) {
final List<String> fieldNames = schema.getFieldNames();
final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc
new file mode 100644
index 0000000000..326a925400
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/bank-account-comments.jsonc
@@ -0,0 +1,24 @@
+[
+ {
+ // comment in object
+ "id": 1,
+ "name": "John Doe",
+ "balance": 4750.89,
+ "address": "123 My Street",
+ "city": "My City",
+ "state": "MS",
+ "zipCode": "11111",
+ "country": "USA"
+ },
+ // comment between objects
+ {
+ "id": 2,
+ "name": "Jane Doe",
+ "balance": 4820.09,
+ "address": "321 Your Street",
+ "city": "Your City",
+ "state": "NY",
+ "zipCode": "33333",
+ "country": "USA"
+ }
+]
\ No newline at end of file