You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/04/14 21:04:34 UTC
[nifi] branch main updated: NIFI-9862 Updated JsonTreeReader to read Records from Nested Field
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 7f95de27fb NIFI-9862 Updated JsonTreeReader to read Records from Nested Field
7f95de27fb is described below
commit 7f95de27fb5f8ead755133ef1f3d74777c6c8578
Author: Lehel <le...@hotmail.com>
AuthorDate: Wed Apr 6 21:50:22 2022 +0200
NIFI-9862 Updated JsonTreeReader to read Records from Nested Field
This closes #5937
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi-record-serialization-services/pom.xml | 3 +
.../nifi/json/AbstractJsonRowRecordReader.java | 98 ++++-
.../org/apache/nifi/json/JsonRecordSource.java | 26 +-
.../java/org/apache/nifi/json/JsonTreeReader.java | 59 ++-
.../apache/nifi/json/JsonTreeRowRecordReader.java | 12 +-
.../apache/nifi/json/StartingFieldStrategy.java | 38 ++
.../additionalDetails.html | 398 ++++++++++-------
.../json/TestInferJsonSchemaAccessStrategy.java | 107 +++--
.../nifi/json/TestJsonPathRowRecordReader.java | 74 ++--
.../apache/nifi/json/TestJsonSchemaInference.java | 4 +-
.../nifi/json/TestJsonTreeRowRecordReader.java | 490 +++++++++++++--------
.../org/apache/nifi/json/TestWriteJsonResult.java | 38 +-
.../test/resources/json/multiple-nested-field.json | 22 +
.../json/nested-array-then-start-object.json | 20 +
.../json/single-element-nested-array-middle.json | 16 +
15 files changed, 932 insertions(+), 473 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
index 8bbe871734..cf43edc51c 100755
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/pom.xml
@@ -206,11 +206,14 @@
<exclude>src/test/resources/json/data-types.json</exclude>
<exclude>src/test/resources/json/timestamp.json</exclude>
<exclude>src/test/resources/json/json-with-unicode.json</exclude>
+ <exclude>src/test/resources/json/multiple-nested-field.json</exclude>
<exclude>src/test/resources/json/primitive-type-array.json</exclude>
<exclude>src/test/resources/json/single-bank-account.json</exclude>
<exclude>src/test/resources/json/single-bank-account-wrong-field-type.json</exclude>
<exclude>src/test/resources/json/single-element-nested-array.json</exclude>
<exclude>src/test/resources/json/single-element-nested.json</exclude>
+ <exclude>src/test/resources/json/single-element-nested-array-middle.json</exclude>
+ <exclude>src/test/resources/json/nested-array-then-start-object.json</exclude>
<exclude>src/test/resources/json/output/dataTypes.json</exclude>
<exclude>src/test/resources/json/elements-for-record-choice.json</exclude>
<exclude>src/test/resources/json/record-choice.avsc</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
index 51d9533992..e248bfd518 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/AbstractJsonRowRecordReader.java
@@ -21,6 +21,7 @@ import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
@@ -51,8 +52,6 @@ import java.util.function.Supplier;
public abstract class AbstractJsonRowRecordReader implements RecordReader {
private final ComponentLog logger;
- private final JsonParser jsonParser;
- private final JsonNode firstJsonNode;
private final Supplier<DateFormat> LAZY_DATE_FORMAT;
private final Supplier<DateFormat> LAZY_TIME_FORMAT;
private final Supplier<DateFormat> LAZY_TIMESTAMP_FORMAT;
@@ -61,11 +60,12 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
private static final JsonFactory jsonFactory = new JsonFactory();
private static final ObjectMapper codec = new ObjectMapper();
+ private JsonParser jsonParser;
+ private JsonNode firstJsonNode;
+ private StartingFieldStrategy strategy;
- public AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
- throws IOException, MalformedRecordException {
-
+ private AbstractJsonRowRecordReader(final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat) {
this.logger = logger;
final DateFormat df = dateFormat == null ? null : DataTypeUtils.getDateFormat(dateFormat);
@@ -75,9 +75,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
LAZY_DATE_FORMAT = () -> df;
LAZY_TIME_FORMAT = () -> tf;
LAZY_TIMESTAMP_FORMAT = () -> tsf;
+ }
+
+ protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat)
+ throws IOException, MalformedRecordException {
+
+ this(logger, dateFormat, timeFormat, timestampFormat);
try {
- jsonParser = jsonFactory.createJsonParser(in);
+ jsonParser = jsonFactory.createParser(in);
jsonParser.setCodec(codec);
JsonToken token = jsonParser.nextToken();
@@ -95,6 +101,37 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
}
+ protected AbstractJsonRowRecordReader(final InputStream in, final ComponentLog logger, final String dateFormat, final String timeFormat, final String timestampFormat,
+ final StartingFieldStrategy strategy, final String nestedFieldName) throws IOException, MalformedRecordException {
+
+ this(logger, dateFormat, timeFormat, timestampFormat);
+
+ this.strategy = strategy;
+
+ try {
+ jsonParser = jsonFactory.createParser(in);
+ jsonParser.setCodec(codec);
+
+ if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+ final SerializedString serializedStartingFieldName = new SerializedString(nestedFieldName);
+ while (!jsonParser.nextFieldName(serializedStartingFieldName) && jsonParser.hasCurrentToken());
+ logger.debug("Parsing starting at nested field [{}]", nestedFieldName);
+ }
+
+ JsonToken token = jsonParser.nextToken();
+ if (token == JsonToken.START_ARRAY) {
+ token = jsonParser.nextToken(); // advance to START_OBJECT token
+ }
+ if (token == JsonToken.START_OBJECT) { // could be END_ARRAY also
+ firstJsonNode = jsonParser.readValueAsTree();
+ } else {
+ firstJsonNode = null;
+ }
+ } catch (final JsonParseException e) {
+ throw new MalformedRecordException("Could not parse data as JSON", e);
+ }
+ }
+
protected Supplier<DateFormat> getLazyDateFormat() {
return LAZY_DATE_FORMAT;
}
@@ -121,7 +158,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
} catch (final MalformedRecordException mre) {
throw mre;
} catch (final Exception e) {
- logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", new Object[] {nextNode, schema, e.toString(), e});
+ logger.debug("Failed to convert JSON Element {} into a Record object using schema {} due to {}", nextNode, schema, e.toString(), e);
throw new MalformedRecordException("Successfully parsed a JSON object from input but failed to convert into a Record object with the given schema", e);
}
}
@@ -178,11 +215,11 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
elementDataType = arrayDataType.getElementType();
} else if (dataType != null && dataType.getFieldType() == RecordFieldType.CHOICE) {
- List<DataType> possibleSubTypes = ((ChoiceDataType)dataType).getPossibleSubTypes();
+ List<DataType> possibleSubTypes = ((ChoiceDataType) dataType).getPossibleSubTypes();
for (DataType possibleSubType : possibleSubTypes) {
if (possibleSubType.getFieldType() == RecordFieldType.ARRAY) {
- ArrayDataType possibleArrayDataType = (ArrayDataType)possibleSubType;
+ ArrayDataType possibleArrayDataType = (ArrayDataType) possibleSubType;
DataType possibleElementType = possibleArrayDataType.getElementType();
final Object[] possibleArrayElements = new Object[numElements];
@@ -214,7 +251,6 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
if (fieldNode.isObject()) {
- RecordSchema childSchema = null;
if (dataType != null && RecordFieldType.MAP == dataType.getFieldType()) {
return getMapFromRawValue(fieldNode, dataType, fieldName);
}
@@ -291,8 +327,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
} else if (dataType.getFieldType() == RecordFieldType.ARRAY) {
final ArrayDataType arrayDataType = (ArrayDataType) dataType;
final DataType elementType = arrayDataType.getElementType();
- final Record record = createOptionalRecord(fieldNode, elementType, strict);
- return record;
+ return createOptionalRecord(fieldNode, elementType, strict);
}
return null;
@@ -309,8 +344,7 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
childValues.put(childFieldName, childValue);
}
- final MapRecord record = new MapRecord(childSchema, childValues);
- return record;
+ return new MapRecord(childSchema, childValues);
}
protected JsonNode getNextJsonNode() throws IOException, MalformedRecordException {
@@ -318,7 +352,15 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
firstObjectConsumed = true;
return firstJsonNode;
}
+ if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+ return getJsonNodeWithNestedNodeStrategy();
+ } else {
+ return getJsonNode();
+ }
+ }
+
+ private JsonNode getJsonNodeWithNestedNodeStrategy() throws IOException, MalformedRecordException {
while (true) {
final JsonToken token = jsonParser.nextToken();
if (token == null) {
@@ -326,14 +368,34 @@ public abstract class AbstractJsonRowRecordReader implements RecordReader {
}
switch (token) {
+ case START_ARRAY:
+ break;
+ case END_ARRAY:
case END_OBJECT:
- continue;
+ case FIELD_NAME:
+ return null;
case START_OBJECT:
return jsonParser.readValueAsTree();
- case END_ARRAY:
- case START_ARRAY:
- continue;
+ default:
+ throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
+ }
+ }
+ }
+
+ private JsonNode getJsonNode() throws IOException, MalformedRecordException {
+ while (true) {
+ final JsonToken token = jsonParser.nextToken();
+ if (token == null) {
+ return null;
+ }
+ switch (token) {
+ case START_ARRAY:
+ case END_ARRAY:
+ case END_OBJECT:
+ break;
+ case START_OBJECT:
+ return jsonParser.readValueAsTree();
default:
throw new MalformedRecordException("Expected to get a JSON Object but got a token of type " + token.name());
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
index 1c6afce9d4..348c2ef02f 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonRecordSource.java
@@ -19,16 +19,22 @@ package org.apache.nifi.json;
import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonToken;
+import com.fasterxml.jackson.core.io.SerializedString;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.schema.inference.RecordSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InputStream;
public class JsonRecordSource implements RecordSource<JsonNode> {
+ private static final Logger logger = LoggerFactory.getLogger(JsonRecordSource.class);
private static final JsonFactory jsonFactory;
private final JsonParser jsonParser;
+ private final StartingFieldStrategy strategy;
+ private final String startingFieldName;
static {
jsonFactory = new JsonFactory();
@@ -36,7 +42,21 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
}
public JsonRecordSource(final InputStream in) throws IOException {
- jsonParser = jsonFactory.createJsonParser(in);
+ jsonParser = jsonFactory.createParser(in);
+ strategy = null;
+ startingFieldName = null;
+ }
+
+ public JsonRecordSource(final InputStream in, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
+ jsonParser = jsonFactory.createParser(in);
+ this.strategy = strategy;
+ this.startingFieldName = startingFieldName;
+
+ if (strategy == StartingFieldStrategy.NESTED_FIELD) {
+ final SerializedString serializedNestedField = new SerializedString(this.startingFieldName);
+ while (!jsonParser.nextFieldName(serializedNestedField) && jsonParser.hasCurrentToken());
+ logger.debug("Parsing starting at nested field [{}]", startingFieldName);
+ }
}
@Override
@@ -50,6 +70,10 @@ public class JsonRecordSource implements RecordSource<JsonNode> {
if (token == JsonToken.START_OBJECT) {
return jsonParser.readValueAsTree();
}
+
+ if (strategy == StartingFieldStrategy.NESTED_FIELD && (token == JsonToken.END_ARRAY || token == JsonToken.END_OBJECT)) {
+ return null;
+ }
}
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
index b0ecad1c80..e5a2ed20fd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeReader.java
@@ -27,10 +27,11 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.schema.access.SchemaAccessStrategy;
import org.apache.nifi.schema.access.SchemaNotFoundException;
-import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.RecordSourceFactory;
+import org.apache.nifi.schema.inference.SchemaInferenceEngine;
import org.apache.nifi.schema.inference.SchemaInferenceUtil;
import org.apache.nifi.schema.inference.TimeValueInference;
import org.apache.nifi.schemaregistry.services.SchemaRegistry;
@@ -44,6 +45,7 @@ import org.apache.nifi.serialization.record.RecordSchema;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
@@ -59,21 +61,49 @@ import static org.apache.nifi.schema.inference.SchemaInferenceUtil.SCHEMA_CACHE;
+ "If an array is encountered, each element in that array will be treated as a separate record. "
+ "If the schema that is configured contains a field that is not present in the JSON, a null value will be used. If the JSON contains "
+ "a field that is not present in the schema, that field will be skipped. "
- + "See the Usage of the Controller Service for more information and examples.")
+ + "See the Usage of the Controller Service for more information and examples.")
@SeeAlso(JsonPathReader.class)
public class JsonTreeReader extends SchemaRegistryService implements RecordReaderFactory {
private volatile String dateFormat;
private volatile String timeFormat;
private volatile String timestampFormat;
+ private volatile String startingFieldName;
+ private volatile StartingFieldStrategy startingFieldStrategy;
+
+ public static final PropertyDescriptor STARTING_FIELD_STRATEGY = new PropertyDescriptor.Builder()
+ .name("starting-field-strategy")
+ .displayName("Starting Field Strategy")
+ .description("Start processing from the root node or from a specified nested node.")
+ .required(true)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(StartingFieldStrategy.ROOT_NODE.name())
+ .allowableValues(
+ Arrays.stream(StartingFieldStrategy.values()).map(startingStrategy ->
+ new AllowableValue(startingStrategy.name(), startingStrategy.getDisplayName(), startingStrategy.getDescription())
+ ).toArray(AllowableValue[]::new))
+ .build();
+
+
+ public static final PropertyDescriptor STARTING_FIELD_NAME = new PropertyDescriptor.Builder()
+ .name("starting-field-name")
+ .displayName("Starting Field Name")
+ .description("Skips forward to the given nested JSON field (array or object) to begin processing.")
+ .required(false)
+ .addValidator(StandardValidators.NON_BLANK_VALIDATOR)
+ .defaultValue(null)
+ .dependsOn(STARTING_FIELD_STRATEGY, StartingFieldStrategy.NESTED_FIELD.name())
+ .build();
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(new PropertyDescriptor.Builder()
- .fromPropertyDescriptor(SCHEMA_CACHE)
- .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
- .build());
+ .fromPropertyDescriptor(SCHEMA_CACHE)
+ .dependsOn(SCHEMA_ACCESS_STRATEGY, INFER_SCHEMA)
+ .build());
+ properties.add(STARTING_FIELD_STRATEGY);
+ properties.add(STARTING_FIELD_NAME);
properties.add(DateTimeUtils.DATE_FORMAT);
properties.add(DateTimeUtils.TIME_FORMAT);
properties.add(DateTimeUtils.TIMESTAMP_FORMAT);
@@ -81,10 +111,12 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
}
@OnEnabled
- public void storeFormats(final ConfigurationContext context) {
+ public void storePropertyValues(final ConfigurationContext context) {
this.dateFormat = context.getProperty(DateTimeUtils.DATE_FORMAT).getValue();
this.timeFormat = context.getProperty(DateTimeUtils.TIME_FORMAT).getValue();
this.timestampFormat = context.getProperty(DateTimeUtils.TIMESTAMP_FORMAT).getValue();
+ this.startingFieldStrategy = StartingFieldStrategy.valueOf(context.getProperty(STARTING_FIELD_STRATEGY).getValue());
+ this.startingFieldName = context.getProperty(STARTING_FIELD_NAME).getValue();
}
@Override
@@ -96,12 +128,15 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
}
@Override
- protected SchemaAccessStrategy getSchemaAccessStrategy(final String strategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
- final RecordSourceFactory<JsonNode> jsonSourceFactory = (var, in) -> new JsonRecordSource(in);
- final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier = () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
+ protected SchemaAccessStrategy getSchemaAccessStrategy(final String schemaAccessStrategy, final SchemaRegistry schemaRegistry, final PropertyContext context) {
+ final RecordSourceFactory<JsonNode> jsonSourceFactory =
+ (var, in) -> new JsonRecordSource(in, startingFieldStrategy, startingFieldName);
+
+ final Supplier<SchemaInferenceEngine<JsonNode>> inferenceSupplier =
+ () -> new JsonSchemaInference(new TimeValueInference(dateFormat, timeFormat, timestampFormat));
- return SchemaInferenceUtil.getSchemaAccessStrategy(strategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
- () -> super.getSchemaAccessStrategy(strategy, schemaRegistry, context));
+ return SchemaInferenceUtil.getSchemaAccessStrategy(schemaAccessStrategy, context, getLogger(), jsonSourceFactory, inferenceSupplier,
+ () -> super.getSchemaAccessStrategy(schemaAccessStrategy, schemaRegistry, context));
}
@Override
@@ -113,6 +148,6 @@ public class JsonTreeReader extends SchemaRegistryService implements RecordReade
public RecordReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger)
throws IOException, MalformedRecordException, SchemaNotFoundException {
final RecordSchema schema = getSchema(variables, in, null);
- return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat);
+ return new JsonTreeRowRecordReader(in, logger, schema, dateFormat, timeFormat, timestampFormat, startingFieldStrategy, startingFieldName);
}
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
index 1bb34549c8..eee487c026 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/JsonTreeRowRecordReader.java
@@ -53,7 +53,12 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
this.schema = schema;
}
-
+ public JsonTreeRowRecordReader(final InputStream in, final ComponentLog logger, final RecordSchema schema,
+ final String dateFormat, final String timeFormat, final String timestampFormat,
+ final StartingFieldStrategy strategy, final String startingFieldName) throws IOException, MalformedRecordException {
+ super(in, logger, dateFormat, timeFormat, timestampFormat, strategy, startingFieldName);
+ this.schema = schema;
+ }
@Override
protected Record convertJsonNodeToRecord(final JsonNode jsonNode, final RecordSchema schema, final boolean coerceTypes, final boolean dropUnknownFields)
@@ -104,7 +109,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
final String fullFieldName = fieldNamePrefix == null ? fieldName : fieldNamePrefix + fieldName;
value = convertField(childNode, fullFieldName, desiredType, dropUnknown);
} else {
- value = getRawNodeValue(childNode, recordField == null ? null : recordField.getDataType(), fieldName);
+ value = getRawNodeValue(childNode, recordField.getDataType(), fieldName);
}
values.put(fieldName, value);
@@ -157,8 +162,7 @@ public class JsonTreeRowRecordReader extends AbstractJsonRowRecordReader {
case TIME:
case TIMESTAMP: {
final Object rawValue = getRawNodeValue(fieldNode, fieldName);
- final Object converted = DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
- return converted;
+ return DataTypeUtils.convertType(rawValue, desiredType, getLazyDateFormat(), getLazyTimeFormat(), getLazyTimestampFormat(), fieldName);
}
case MAP: {
final DataType valueType = ((MapDataType) desiredType).getValueType();
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java
new file mode 100644
index 0000000000..44727e72fd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/java/org/apache/nifi/json/StartingFieldStrategy.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.json;
+
+public enum StartingFieldStrategy {
+ ROOT_NODE("Root Node", "Begins processing from the root node."),
+ NESTED_FIELD("Nested Field", "Skips forward to the given nested JSON field (array or object) to begin processing.");
+
+ private final String displayName;
+ private final String description;
+
+ StartingFieldStrategy(final String displayName, final String description) {
+ this.displayName = displayName;
+ this.description = description;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
index b5415c3b74..9a563d4f65 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/main/resources/docs/org.apache.nifi.json.JsonTreeReader/additionalDetails.html
@@ -22,59 +22,60 @@
<body>
<p>
- The JsonTreeReader Controller Service reads a JSON Object and creates a Record object for the entire
- JSON Object tree. The Controller Service must be configured with a Schema that describes the structure
- of the JSON data. If any field exists in the JSON that is not in the schema, that field will be skipped.
- If the schema contains a field for which no JSON field exists, a null value will be used in the Record
- (or the default value defined in the schema, if applicable).
+ The JsonTreeReader Controller Service reads a JSON Object and creates a Record object either for the
+ entire JSON Object tree or a subpart (see "Starting Field Strategies" section). The Controller Service
+ must be configured with a Schema that describes the structure of the JSON data. If any field exists in
+ the JSON that is not in the schema, that field will be skipped. If the schema contains a field for which
+ no JSON field exists, a null value will be used in the Record (or the default value defined in the schema,
+ if applicable).
</p>
<p>
- If the root element of the JSON is a JSON Array, each JSON Object within that array will be treated as
- its own separate Record. If the root element is a JSON Object, the JSON will all be treated as a single
- Record.
+ If the root element of the JSON is a JSON Array, each JSON Object within that array will be treated as
+ its own separate Record. If the root element is a JSON Object, the JSON will all be treated as a single
+ Record.
</p>
- <h2>Schemas and Type Coercion</h2>
-
- <p>
- When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
- configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
- the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
- is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
- schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
- into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
- </p>
-
- <p>
- The following rules apply when attempting to coerce a field value from one data type to another:
- </p>
-
- <ul>
- <li>Any data type can be coerced into a String type.</li>
- <li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
- <li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
- milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
- <li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
- or "Timestamp Format."</li>
- <li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
- <code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
- type but not an Integer.</li>
- <li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
- <li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
- and the rest of the characters are ignored.</li>
- <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
- <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
- <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
- property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
- representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
- </ul>
-
- <p>
- If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
- will be thrown.
- </p>
+ <h2>Schemas and Type Coercion</h2>
+
+ <p>
+ When a record is parsed from incoming data, it is separated into fields. Each of these fields is then looked up against the
+ configured schema (by field name) in order to determine what the type of the data should be. If the field is not present in
+ the schema, that field is omitted from the Record. If the field is found in the schema, the data type of the received data
+ is compared against the data type specified in the schema. If the types match, the value of that field is used as-is. If the
+ schema indicates that the field should be of a different type, then the Controller Service will attempt to coerce the data
+ into the type specified by the schema. If the field cannot be coerced into the specified type, an Exception will be thrown.
+ </p>
+
+ <p>
+ The following rules apply when attempting to coerce a field value from one data type to another:
+ </p>
+
+ <ul>
+ <li>Any data type can be coerced into a String type.</li>
+ <li>Any numeric data type (Byte, Short, Int, Long, Float, Double) can be coerced into any other numeric data type.</li>
+ <li>Any numeric value can be coerced into a Date, Time, or Timestamp type, by assuming that the Long value is the number of
+ milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+ <li>A String value can be coerced into a Date, Time, or Timestamp type, if its format matches the configured "Date Format," "Time Format,"
+ or "Timestamp Format."</li>
+ <li>A String value can be coerced into a numeric value if the value is of the appropriate type. For example, the String value
+ <code>8</code> can be coerced into any numeric type. However, the String value <code>8.2</code> can be coerced into a Double or Float
+ type but not an Integer.</li>
+ <li>A String value of "true" or "false" (regardless of case) can be coerced into a Boolean value.</li>
+ <li>A String value that is not empty can be coerced into a Char type. If the String contains more than 1 character, the first character is used
+ and the rest of the characters are ignored.</li>
+ <li>Any "date/time" type (Date, Time, Timestamp) can be coerced into any other "date/time" type.</li>
+ <li>Any "date/time" type can be coerced into a Long type, representing the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+ <li>Any "date/time" type can be coerced into a String. The format of the String is whatever DateFormat is configured for the corresponding
+ property (Date Format, Time Format, Timestamp Format property). If no value is specified, then the value will be converted into a String
+ representation of the number of milliseconds since epoch (Midnight GMT, January 1, 1970).</li>
+ </ul>
+
+ <p>
+ If none of the above rules apply when attempting to coerce a value from one data type to another, the coercion will fail and an Exception
+ will be thrown.
+ </p>
@@ -168,165 +169,242 @@
</p>
+ <h2>Starting Field Strategies</h2>
- <h2>Examples</h2>
+ <p>
+ When using JsonTreeReader, two different starting field strategies can be selected. With the default Root Node strategy, the JsonTreeReader begins processing from the root element
+ of the JSON and creates a Record object for the entire JSON Object tree, while the Nested Field strategy defines a nested field from which to begin processing.
+ </p>
+ <p>
+ Using the Nested Field strategy, a schema corresponding to the nested JSON part should be specified. In case of schema inference, the JsonTreeReader will automatically
+ infer a schema from nested records.
+ </p>
+
+ <h3>Root Node Strategy</h3>
<p>
- As an example, consider the following JSON is read:
+ Consider the following JSON is read with the default Root Node strategy:
</p>
<code>
<pre>
-[{
+[
+ {
"id": 17,
"name": "John",
"child": {
- "id": "1"
+ "id": "1"
},
- "dob": "10-29-1982"
+ "dob": "10-29-1982",
"siblings": [
- { "name": "Jeremy", "id": 4 },
- { "name": "Julia", "id": 8}
+ {
+ "name": "Jeremy",
+ "id": 4
+ },
+ {
+ "name": "Julia",
+ "id": 8
+ }
]
},
{
"id": 98,
"name": "Jane",
"child": {
- "id": 2
+ "id": 2
},
- "dob": "08-30-1984"
+ "dob": "08-30-1984",
"gender": "F",
"siblingIds": [],
"siblings": []
- }]
+ }
+]
</pre>
</code>
<p>
- Also, consider that the schema that is configured for this JSON is as follows (assuming that the AvroSchemaRegistry
- Controller Service is chosen to denote the Schema):
+ Also, consider that the schema that is configured for this JSON is as follows (assuming that the AvroSchemaRegistry
+ Controller Service is chosen to denote the Schema):
</p>
<code>
<pre>
{
- "namespace": "nifi",
- "name": "person",
- "type": "record",
- "fields": [
- { "name": "id", "type": "int" },
- { "name": "name", "type": "string" },
- { "name": "gender", "type": "string" },
- { "name": "dob", "type": {
- "type": "int",
- "logicalType": "date"
- }},
- { "name": "siblings", "type": {
- "type": "array",
- "items": {
- "type": "record",
- "fields": [
- { "name": "name", "type": "string" }
- ]
- }
- }}
- ]
+ "namespace": "nifi",
+ "name": "person",
+ "type": "record",
+ "fields": [
+ { "name": "id", "type": "int" },
+ { "name": "name", "type": "string" },
+ { "name": "gender", "type": "string" },
+ { "name": "dob", "type": {
+ "type": "int",
+ "logicalType": "date"
+ }},
+ { "name": "siblings", "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "fields": [
+ { "name": "name", "type": "string" }
+ ]
+ }
+ }}
+ ]
}
</pre>
</code>
<p>
- Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this
- matches the date format used for our JSON data. This will result in the JSON creating two separate records, because the root
- element is a JSON array with two elements.
+ Let us also assume that this Controller Service is configured with the "Date Format" property set to "MM-dd-yyyy", as this
+ matches the date format used for our JSON data. This will result in the JSON creating two separate records, because the root
+ element is a JSON array with two elements.
</p>
<p>
- The first Record will consist of the following values:
+ The first Record will consist of the following values:
</p>
<table>
- <tr>
- <th>Field Name</th>
- <th>Field Value</th>
- </tr>
- <tr>
- <td>id</td>
- <td>17</td>
- </tr>
- <tr>
- <td>name</td>
- <td>John</td>
- </tr>
- <tr>
- <td>gender</td>
- <td><i>null</i></td>
- </tr>
- <tr>
- <td>dob</td>
- <td>11-30-1983</td>
- </tr>
- <tr>
- <td>siblings</td>
- <td>
- <i>array with two elements, each of which is itself a Record:</i>
- <br />
- <table>
- <tr>
- <th>Field Name</th>
- <th>Field Value</th>
- </tr>
- <tr>
- <td>name</td>
- <td>Jeremy</td>
- </tr>
- </table>
- <br />
- <i>and:</i>
- <br />
- <table>
- <tr>
- <th>Field Name</th>
- <th>Field Value</th>
- </tr>
- <tr>
- <td>name</td>
- <td>Julia</td>
- </tr>
- </table>
- </td>
- </tr>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>id</td>
+ <td>17</td>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>John</td>
+ </tr>
+ <tr>
+ <td>gender</td>
+ <td><i>null</i></td>
+ </tr>
+ <tr>
+ <td>dob</td>
+ <td>11-30-1983</td>
+ </tr>
+ <tr>
+ <td>siblings</td>
+ <td>
+ <i>array with two elements, each of which is itself a Record:</i>
+ <br />
+ <table>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>Jeremy</td>
+ </tr>
+ </table>
+ <br />
+ <i>and:</i>
+ <br />
+ <table>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>Julia</td>
+ </tr>
+ </table>
+ </td>
+ </tr>
</table>
<p>
- The second Record will consist of the following values:
+ The second Record will consist of the following values:
</p>
- <table>
- <tr>
- <th>Field Name</th>
- <th>Field Value</th>
- </tr>
- <tr>
- <td>id</td>
- <td>98</td>
- </tr>
- <tr>
- <td>name</td>
- <td>Jane</td>
- </tr>
- <tr>
- <td>gender</td>
- <td>F</td>
- </tr>
- <tr>
- <td>dob</td>
- <td>08-30-1984</td>
- </tr>
- <tr>
- <td>siblings</td>
- <td><i>empty array</i></td>
- </tr>
+ <table>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>id</td>
+ <td>98</td>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>Jane</td>
+ </tr>
+ <tr>
+ <td>gender</td>
+ <td>F</td>
+ </tr>
+ <tr>
+ <td>dob</td>
+ <td>08-30-1984</td>
+ </tr>
+ <tr>
+ <td>siblings</td>
+ <td><i>empty array</i></td>
+ </tr>
+ </table>
+
+ <h3>Nested Field Strategy</h3>
+
+ <p>
+ Using the Nested Field strategy, consider the same JSON where the specified Starting Field Name is
+ "siblings". The schema that is configured for this JSON is as follows:
+ </p>
+
+<code>
+<pre>
+{
+ "namespace": "nifi",
+ "name": "siblings",
+ "type": "record",
+ "fields": [
+ { "name": "name", "type": "string" },
+ { "name": "id", "type": "int" }
+ ]
+}
+</pre>
+</code>
+
+ <p>
+ The first Record will consist of the following values:
+ </p>
+
+ <table>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>Jeremy</td>
+ </tr>
+ <tr>
+ <td>id</td>
+ <td>4</td>
+ </tr>
+ </table>
+
+ <p>
+ The second Record will consist of the following values:
+ </p>
+
+ <table>
+ <tr>
+ <th>Field Name</th>
+ <th>Field Value</th>
+ </tr>
+ <tr>
+ <td>name</td>
+ <td>Julia</td>
+ </tr>
+ <tr>
+ <td>id</td>
+ <td>8</td>
+ </tr>
</table>
</body>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
index 960c7b6127..7ef2089fe2 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestInferJsonSchemaAccessStrategy.java
@@ -29,6 +29,9 @@ import org.apache.nifi.serialization.record.type.ChoiceDataType;
import org.apache.nifi.serialization.record.type.RecordDataType;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.Mockito;
import java.io.BufferedInputStream;
@@ -41,32 +44,31 @@ import java.nio.file.Files;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Stream;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestInferJsonSchemaAccessStrategy {
- private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
- private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
- private final String timestampFormat = "yyyy-MM-DD'T'HH:mm:ss.SSS'Z'";
+class TestInferJsonSchemaAccessStrategy {
private final SchemaInferenceEngine<JsonNode> timestampInference = new JsonSchemaInference(new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
private final SchemaInferenceEngine<JsonNode> noTimestampInference = new JsonSchemaInference(new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"));
@Test
- @Disabled
- public void testPerformanceOfSchemaInferenceWithTimestamp() throws IOException {
+ @Disabled("Intended only for manual testing to determine performance before/after modifications")
+ void testPerformanceOfSchemaInferenceWithTimestamp() throws IOException {
final File file = new File("src/test/resources/json/prov-events.json");
final byte[] data = Files.readAllBytes(file.toPath());
- final ComponentLog logger = Mockito.mock(ComponentLog.class);
final byte[] manyCopies = new byte[data.length * 20];
- for (int i=0; i < 20; i++) {
+ for (int i = 0; i < 20; i++) {
System.arraycopy(data, 0, manyCopies, data.length * i, data.length);
}
- final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content), timestampInference, Mockito.mock(ComponentLog.class));
+ final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
+ (var, content) -> new JsonRecordSource(content), timestampInference, Mockito.mock(ComponentLog.class)
+ );
for (int j = 0; j < 10; j++) {
final long start = System.nanoTime();
@@ -83,14 +85,13 @@ public class TestInferJsonSchemaAccessStrategy {
}
@Test
- @Disabled
- public void testPerformanceOfSchemaInferenceWithoutTimestamp() throws IOException {
+ @Disabled("Intended only for manual testing to determine performance before/after modifications")
+ void testPerformanceOfSchemaInferenceWithoutTimestamp() throws IOException {
final File file = new File("src/test/resources/json/prov-events.json");
final byte[] data = Files.readAllBytes(file.toPath());
- final ComponentLog logger = Mockito.mock(ComponentLog.class);
final byte[] manyCopies = new byte[data.length * 20];
- for (int i=0; i < 20; i++) {
+ for (int i = 0; i < 20; i++) {
System.arraycopy(data, 0, manyCopies, data.length * i, data.length);
}
@@ -99,8 +100,8 @@ public class TestInferJsonSchemaAccessStrategy {
for (int i = 0; i < 10_000; i++) {
try (final InputStream in = new ByteArrayInputStream(manyCopies)) {
- final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content),
- noTimestampInference, Mockito.mock(ComponentLog.class));
+ final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var, content) -> new JsonRecordSource(content),
+ noTimestampInference, Mockito.mock(ComponentLog.class));
final RecordSchema schema = accessStrategy.getSchema(null, in, null);
}
@@ -112,9 +113,9 @@ public class TestInferJsonSchemaAccessStrategy {
}
@Test
- public void testInferenceIncludesAllRecords() throws IOException {
+ void testInferenceIncludesAllRecords() throws IOException {
final File file = new File("src/test/resources/json/prov-events.json");
- final RecordSchema schema = inferSchema(file);
+ final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
final RecordField extraField1 = schema.getField("extra field 1").get();
assertSame(RecordFieldType.STRING, extraField1.getDataType().getFieldType());
@@ -127,7 +128,7 @@ public class TestInferJsonSchemaAccessStrategy {
assertSame(RecordFieldType.RECORD, updatedAttributesDataType.getFieldType());
final List<String> expectedAttributeNames = Arrays.asList("path", "filename", "drop reason", "uuid", "reporting.task.type", "s2s.address", "schema.cache.identifier", "reporting.task.uuid",
- "record.count", "s2s.host", "reporting.task.transaction.id", "reporting.task.name", "mime.type");
+ "record.count", "s2s.host", "reporting.task.transaction.id", "reporting.task.name", "mime.type");
final RecordSchema updatedAttributesSchema = ((RecordDataType) updatedAttributesDataType).getChildSchema();
assertEquals(expectedAttributeNames.size(), updatedAttributesSchema.getFieldCount());
@@ -138,9 +139,9 @@ public class TestInferJsonSchemaAccessStrategy {
}
@Test
- public void testDateAndTimestampsInferred() throws IOException {
+ void testDateAndTimestampsInferred() throws IOException {
final File file = new File("src/test/resources/json/prov-events.json");
- final RecordSchema schema = inferSchema(file);
+ final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
final RecordField timestampField = schema.getField("timestamp").get();
assertEquals(RecordFieldType.TIMESTAMP.getDataType("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"), timestampField.getDataType());
@@ -167,9 +168,9 @@ public class TestInferJsonSchemaAccessStrategy {
* Test is intended to ensure that all inference rules that are explained in the readers' additionalDetails.html are correct
*/
@Test
- public void testDocsExample() throws IOException {
+ void testDocsExample() throws IOException {
final File file = new File("src/test/resources/json/docs-example.json");
- final RecordSchema schema = inferSchema(file);
+ final RecordSchema schema = inferSchema(file, StartingFieldStrategy.ROOT_NODE, null);
assertSame(RecordFieldType.STRING, schema.getDataType("name").get().getFieldType());
assertSame(RecordFieldType.CHOICE, schema.getDataType("age").get().getFieldType());
@@ -189,14 +190,70 @@ public class TestInferJsonSchemaAccessStrategy {
assertSame(RecordFieldType.STRING, schema.getDataType("nullValue").get().getFieldType());
}
- private RecordSchema inferSchema(final File file) throws IOException {
+ @ParameterizedTest(name = "{index} {2}")
+ @MethodSource("startingFieldNameArgumentProvider")
+ void testInferenceStartsFromArray(final String jsonPath, final StartingFieldStrategy strategy, final String startingFieldName, String testName) throws IOException {
+ final File file = new File(jsonPath);
+ final RecordSchema schema = inferSchema(file, strategy, startingFieldName);
+
+ assertEquals(2, schema.getFieldCount());
+
+ final RecordField field1 = schema.getField("id").get();
+ assertSame(RecordFieldType.INT, field1.getDataType().getFieldType());
+
+ final RecordField field2 = schema.getField("balance").get();
+ assertSame(RecordFieldType.DOUBLE, field2.getDataType().getFieldType());
+ }
+
+ @Test
+ void testInferenceStartsFromSimpleFieldAndNoNestedObjectOrArrayFound() throws IOException {
+ final File file = new File("src/test/resources/json/single-element-nested-array-middle.json");
+ final RecordSchema schema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "name");
+
+ assertEquals(0, schema.getFieldCount());
+ }
+
+ @Test
+ void testInferenceStartFromNonExistentField() throws IOException {
+ final File file = new File("src/test/resources/json/single-element-nested-array.json");
+ final RecordSchema recordSchema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "notfound");
+ assertEquals(0, recordSchema.getFieldCount());
+ }
+
+ @Test
+ void testInferenceStartFromMultipleNestedField() throws IOException {
+ final File file = new File("src/test/resources/json/multiple-nested-field.json");
+ final RecordSchema schema = inferSchema(file, StartingFieldStrategy.NESTED_FIELD, "accountIds");
+
+ final RecordField field1 = schema.getField("id").get();
+ assertSame(RecordFieldType.STRING, field1.getDataType().getFieldType());
+
+ final RecordField field2 = schema.getField("type").get();
+ assertSame(RecordFieldType.STRING, field2.getDataType().getFieldType());
+ }
+
+ private RecordSchema inferSchema(final File file, final StartingFieldStrategy strategy, final String startingFieldName) throws IOException {
try (final InputStream in = new FileInputStream(file);
final InputStream bufferedIn = new BufferedInputStream(in)) {
- final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>((var,content) -> new JsonRecordSource(content),
- timestampInference, Mockito.mock(ComponentLog.class));
+ final InferSchemaAccessStrategy<?> accessStrategy = new InferSchemaAccessStrategy<>(
+ (var, content) -> new JsonRecordSource(content, strategy, startingFieldName),
+ timestampInference, Mockito.mock(ComponentLog.class)
+ );
return accessStrategy.getSchema(null, bufferedIn, null);
}
}
+
+ private static Stream<Arguments> startingFieldNameArgumentProvider() {
+ final StartingFieldStrategy strategy = StartingFieldStrategy.NESTED_FIELD;
+ return Stream.of(
+ Arguments.of("src/test/resources/json/single-element-nested-array.json", strategy, "accounts", "testInferenceSkipsToNestedArray"),
+ Arguments.of("src/test/resources/json/single-element-nested.json", strategy, "account", "testInferenceSkipsToNestedObject"),
+ Arguments.of("src/test/resources/json/single-element-nested-array.json", strategy, "name", "testInferenceSkipsToSimpleFieldFindsNextNestedArray"),
+ Arguments.of("src/test/resources/json/single-element-nested.json", strategy, "name", "testInferenceSkipsToSimpleFieldFindsNextNestedObject"),
+ Arguments.of("src/test/resources/json/single-element-nested-array-middle.json", strategy, "accounts", "testInferenceSkipsToNestedArrayInMiddle"),
+ Arguments.of("src/test/resources/json/nested-array-then-start-object.json", strategy, "accounts", "testInferenceSkipsToNestedThenStartObject")
+ );
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
index 904e7cd8d0..ed29e5cc5b 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonPathRowRecordReader.java
@@ -47,7 +47,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestJsonPathRowRecordReader {
+class TestJsonPathRowRecordReader {
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -87,25 +87,24 @@ public class TestJsonPathRowRecordReader {
accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
- final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
- return accountSchema;
+ return new SimpleRecordSchema(accountFields);
}
@Test
- public void testReadArray() throws IOException, MalformedRecordException {
+ void testReadArray() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -119,19 +118,19 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testReadOneLine() throws IOException, MalformedRecordException {
+ void testReadOneLine() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -145,19 +144,19 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testSingleJsonElement() throws IOException, MalformedRecordException {
+ void testSingleJsonElement() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -168,7 +167,7 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+ void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(recordFields);
@@ -195,7 +194,7 @@ public class TestJsonPathRowRecordReader {
@Test
- public void testElementWithNestedData() throws IOException, MalformedRecordException {
+ void testElementWithNestedData() throws IOException, MalformedRecordException {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("account", JsonPath.compile("$.account"));
@@ -208,12 +207,12 @@ public class TestJsonPathRowRecordReader {
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "account"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "account");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -231,7 +230,7 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testElementWithNestedArray() throws IOException, MalformedRecordException {
+ void testElementWithNestedArray() throws IOException, MalformedRecordException {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("accounts", JsonPath.compile("$.accounts"));
@@ -246,13 +245,12 @@ public class TestJsonPathRowRecordReader {
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {
- "id", "name", "balance", "address", "city", "state", "zipCode", "country", "accounts"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "accounts");
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -282,19 +280,19 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
+ void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(allJsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -311,7 +309,7 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
+ void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("address2", JsonPath.compile("$.address2"));
@@ -324,12 +322,12 @@ public class TestJsonPathRowRecordReader {
final JsonPathRowRecordReader reader = new JsonPathRowRecordReader(jsonPaths, schema, in, Mockito.mock(ComponentLog.class), dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2");
assertEquals(expectedFieldNames, fieldNames);
final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -346,7 +344,7 @@ public class TestJsonPathRowRecordReader {
}
@Test
- public void testPrimitiveTypeArrays() throws IOException, MalformedRecordException {
+ void testPrimitiveTypeArrays() throws IOException, MalformedRecordException {
final LinkedHashMap<String, JsonPath> jsonPaths = new LinkedHashMap<>(allJsonPaths);
jsonPaths.put("accountIds", JsonPath.compile("$.accountIds"));
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
index 683e884237..1eb655c117 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonSchemaInference.java
@@ -35,12 +35,12 @@ import java.util.List;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
-public class TestJsonSchemaInference {
+class TestJsonSchemaInference {
private final TimeValueInference timestampInference = new TimeValueInference("yyyy-MM-dd", "HH:mm:ss", "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'");
@Test
- public void testInferenceIncludesAllRecords() throws IOException {
+ void testInferenceIncludesAllRecords() throws IOException {
final File file = new File("src/test/resources/json/data-types.json");
final RecordSchema schema;
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
index f65c0752fe..752d1a6676 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestJsonTreeRowRecordReader.java
@@ -50,7 +50,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
@@ -59,11 +58,12 @@ import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
-public class TestJsonTreeRowRecordReader {
+class TestJsonTreeRowRecordReader {
private final String dateFormat = RecordFieldType.DATE.getDefaultFormat();
private final String timeFormat = RecordFieldType.TIME.getDefaultFormat();
private final String timestampFormat = RecordFieldType.TIMESTAMP.getDefaultFormat();
@@ -90,13 +90,12 @@ public class TestJsonTreeRowRecordReader {
accountFields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
accountFields.add(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
- final RecordSchema accountSchema = new SimpleRecordSchema(accountFields);
- return accountSchema;
+ return new SimpleRecordSchema(accountFields);
}
@Test
- public void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
+ void testReadChoiceOfStringOrArrayOfRecords() throws IOException, MalformedRecordException {
final File schemaFile = new File("src/test/resources/json/choice-of-string-or-array-record.avsc");
final File jsonFile = new File("src/test/resources/json/choice-of-string-or-array-record.json");
@@ -129,7 +128,7 @@ public class TestJsonTreeRowRecordReader {
@Test
@Disabled("Intended only for manual testing to determine performance before/after modifications")
- public void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
+ void testPerformanceOnLocalFile() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/prov/16812193969219289");
@@ -158,7 +157,7 @@ public class TestJsonTreeRowRecordReader {
@Test
@Disabled("Intended only for manual testing to determine performance before/after modifications")
- public void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
+ void testPerformanceOnIndividualMessages() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(Collections.emptyList());
final File file = new File("/devel/nifi/nifi-assembly/target/nifi-1.2.0-SNAPSHOT-bin/nifi-1.2.0-SNAPSHOT/1.prov.json");
@@ -186,12 +185,12 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
+ void testChoiceOfRecordTypes() throws IOException, MalformedRecordException {
final Schema avroSchema = new Schema.Parser().parse(new File("src/test/resources/json/record-choice.avsc"));
final RecordSchema recordSchema = AvroTypeUtil.createSchema(avroSchema);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/elements-for-record-choice.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/elements-for-record-choice.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), recordSchema, dateFormat, timeFormat, timestampFormat)) {
// evaluate first record
final Record firstRecord = reader.nextRecord();
@@ -201,7 +200,7 @@ public class TestJsonTreeRowRecordReader {
assertEquals("1234", firstRecord.getValue("id"));
// record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
- assertTrue(firstOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
+ assertSame(RecordFieldType.CHOICE, firstOuterSchema.getDataType("child").get().getFieldType());
final List<DataType> firstSubTypes = ((ChoiceDataType) firstOuterSchema.getDataType("child").get()).getPossibleSubTypes();
assertEquals(2, firstSubTypes.size());
assertEquals(2L, firstSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
@@ -212,7 +211,7 @@ public class TestJsonTreeRowRecordReader {
final Record firstChildRecord = (Record) childObject;
final RecordSchema firstChildSchema = firstChildRecord.getSchema();
- assertEquals(Arrays.asList("id"), firstChildSchema.getFieldNames());
+ assertEquals(Collections.singletonList("id"), firstChildSchema.getFieldNames());
// evaluate second record
final Record secondRecord = reader.nextRecord();
@@ -223,7 +222,7 @@ public class TestJsonTreeRowRecordReader {
assertEquals("1234", secondRecord.getValue("id"));
// record should have a schema that indicates that the 'child' is a CHOICE of 2 different record types
- assertTrue(secondOuterSchema.getDataType("child").get().getFieldType() == RecordFieldType.CHOICE);
+ assertSame(RecordFieldType.CHOICE, secondOuterSchema.getDataType("child").get().getFieldType());
final List<DataType> secondSubTypes = ((ChoiceDataType) secondOuterSchema.getDataType("child").get()).getPossibleSubTypes();
assertEquals(2, secondSubTypes.size());
assertEquals(2L, secondSubTypes.stream().filter(type -> type.getFieldType() == RecordFieldType.RECORD).count());
@@ -234,7 +233,7 @@ public class TestJsonTreeRowRecordReader {
final Record secondChildRecord = (Record) secondChildObject;
final RecordSchema secondChildSchema = secondChildRecord.getSchema();
- assertEquals(Arrays.asList("name"), secondChildSchema.getFieldNames());
+ assertEquals(Collections.singletonList("name"), secondChildSchema.getFieldNames());
assertNull(reader.nextRecord());
}
@@ -242,19 +241,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadArray() throws IOException, MalformedRecordException {
+ void testReadArray() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -268,19 +267,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
+ void testReadOneLinePerJSON() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-oneline.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-oneline.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -294,20 +293,20 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadMultilineJSON() throws IOException, MalformedRecordException {
+ void testReadMultilineJSON() throws IOException, MalformedRecordException {
final List<RecordField> fields = getFields(RecordFieldType.DECIMAL.getDecimalDataType(30, 10));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiline.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiline.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DECIMAL, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -321,19 +320,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadMultilineArrays() throws IOException, MalformedRecordException {
+ void testReadMultilineArrays() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-multiarray.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-multiarray.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -353,19 +352,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadMixedJSON() throws IOException, MalformedRecordException {
+ void testReadMixedJSON() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-mixed.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-mixed.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -386,14 +385,14 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
+ void testReadRawRecordIncludesFieldsNotInSchema() throws IOException, MalformedRecordException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.INT.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals(1, schemaValidatedRecord.getValue("id"));
@@ -401,8 +400,8 @@ public class TestJsonTreeRowRecordReader {
assertNull(schemaValidatedRecord.getValue("balance"));
}
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id"));
@@ -418,14 +417,14 @@ public class TestJsonTreeRowRecordReader {
@Test
- public void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
+ void testReadRawRecordTypeCoercion() throws IOException, MalformedRecordException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record schemaValidatedRecord = reader.nextRecord(true, true);
assertEquals("1", schemaValidatedRecord.getValue("id")); // will be coerced into a STRING as per the schema
@@ -435,8 +434,8 @@ public class TestJsonTreeRowRecordReader {
assertEquals(2, schemaValidatedRecord.getRawFieldNames().size());
}
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Record rawRecord = reader.nextRecord(false, false);
assertEquals(1, rawRecord.getValue("id")); // will return raw value of (int) 1
@@ -453,7 +452,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testDateCoercedFromString() throws IOException, MalformedRecordException {
+ void testDateCoercedFromString() throws IOException, MalformedRecordException {
final String dateField = "date";
final List<RecordField> recordFields = Collections.singletonList(new RecordField(dateField, RecordFieldType.DATE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(recordFields);
@@ -474,12 +473,12 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
+ void testTimestampCoercedFromString() throws IOException, MalformedRecordException {
final List<RecordField> recordFields = Collections.singletonList(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(recordFields);
for (final boolean coerceTypes : new boolean[] {true, false}) {
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/timestamp.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/timestamp.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, "yyyy/MM/dd HH:mm:ss")) {
final Record record = reader.nextRecord(coerceTypes, false);
@@ -490,19 +489,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testSingleJsonElement() throws IOException, MalformedRecordException {
+ void testSingleJsonElement() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -513,21 +512,21 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
+ void testSingleJsonElementWithChoiceFields() throws IOException, MalformedRecordException {
// Wraps default fields by Choice data type to test mapping to a Choice type.
final List<RecordField> choiceFields = getDefaultFields().stream()
.map(f -> new RecordField(f.getFieldName(), RecordFieldType.CHOICE.getChoiceDataType(f.getDataType()))).collect(Collectors.toList());
final RecordSchema schema = new SimpleRecordSchema(choiceFields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-bank-account.json"));
+ try (final InputStream in = new FileInputStream("src/test/resources/json/single-bank-account.json");
final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
final List<RecordField> fields = schema.getFields();
for (int i = 0; i < schema.getFields().size(); i++) {
assertTrue(fields.get(i).getDataType() instanceof ChoiceDataType);
@@ -543,19 +542,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testElementWithNestedData() throws IOException, MalformedRecordException {
+ void testElementWithNestedData() throws IOException, MalformedRecordException {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final List<RecordField> fields = getDefaultFields();
fields.add(new RecordField("account", accountType));
fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.RECORD);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -573,7 +572,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testElementWithNestedArray() throws IOException, MalformedRecordException {
+ void testElementWithNestedArray() throws IOException, MalformedRecordException {
final DataType accountRecordType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final DataType accountsType = RecordFieldType.ARRAY.getArrayDataType(accountRecordType);
@@ -582,17 +581,16 @@ public class TestJsonTreeRowRecordReader {
fields.remove(new RecordField("balance", RecordFieldType.DOUBLE.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/single-element-nested-array.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/single-element-nested-array.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {
- "id", "name", "address", "city", "state", "zipCode", "country", "accounts"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "address", "city", "state", "zipCode", "country", "accounts");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.ARRAY);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -607,19 +605,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
+ void testReadArrayDifferentSchemas() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-different-schemas.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING,
- RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING,
+ RecordFieldType.DOUBLE, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -636,53 +634,19 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testReadArrayDifferentSchemasWithOverride() throws IOException, MalformedRecordException {
- final Map<String, DataType> overrides = new HashMap<>();
- overrides.put("address2", RecordFieldType.STRING.getDataType());
-
- final List<RecordField> fields = getDefaultFields();
- fields.add(new RecordField("address2", RecordFieldType.STRING.getDataType()));
- final RecordSchema schema = new SimpleRecordSchema(fields);
-
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-different-schemas.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
-
- final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country", "address2"});
- assertEquals(expectedFieldNames, fieldNames);
-
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
- assertEquals(expectedTypes, dataTypes);
-
- final Object[] firstRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {1, "John Doe", 4750.89, "123 My Street", "My City", "MS", "11111", "USA", null}, firstRecordValues);
-
- final Object[] secondRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {2, "Jane Doe", 4820.09, "321 Your Street", "Your City", "NY", "33333", null, null}, secondRecordValues);
-
- final Object[] thirdRecordValues = reader.nextRecord().getValues();
- assertArrayEquals(new Object[] {3, "Jake Doe", 4751.89, "124 My Street", "My City", "MS", "11111", "USA", "Apt. #12"}, thirdRecordValues);
-
- assertNull(reader.nextRecord());
- }
- }
-
- @Test
- public void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
+ void testReadArrayDifferentSchemasWithOptionalElementOverridden() throws IOException, MalformedRecordException {
final RecordSchema schema = new SimpleRecordSchema(getDefaultFields());
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/bank-account-array-optional-balance.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/bank-account-array-optional-balance.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final List<String> fieldNames = schema.getFieldNames();
- final List<String> expectedFieldNames = Arrays.asList(new String[] {"id", "name", "balance", "address", "city", "state", "zipCode", "country"});
+ final List<String> expectedFieldNames = Arrays.asList("id", "name", "balance", "address", "city", "state", "zipCode", "country");
assertEquals(expectedFieldNames, fieldNames);
- final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(dt -> dt.getFieldType()).collect(Collectors.toList());
- final List<RecordFieldType> expectedTypes = Arrays.asList(new RecordFieldType[] {RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
- RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING});
+ final List<RecordFieldType> dataTypes = schema.getDataTypes().stream().map(DataType::getFieldType).collect(Collectors.toList());
+ final List<RecordFieldType> expectedTypes = Arrays.asList(RecordFieldType.INT, RecordFieldType.STRING, RecordFieldType.DOUBLE, RecordFieldType.STRING,
+ RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING, RecordFieldType.STRING);
assertEquals(expectedTypes, dataTypes);
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -700,7 +664,7 @@ public class TestJsonTreeRowRecordReader {
@Test
- public void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
+ void testReadUnicodeCharacters() throws IOException, MalformedRecordException {
final List<RecordField> fromFields = new ArrayList<>();
fromFields.add(new RecordField("id", RecordFieldType.LONG.getDataType()));
@@ -715,8 +679,8 @@ public class TestJsonTreeRowRecordReader {
fields.add(new RecordField("from", fromType));
final RecordSchema schema = new SimpleRecordSchema(fields);
- try (final InputStream in = new FileInputStream(new File("src/test/resources/json/json-with-unicode.json"));
- final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
+ try (final InputStream in = new FileInputStream("src/test/resources/json/json-with-unicode.json");
+ final JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(in, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
final Object[] firstRecordValues = reader.nextRecord().getValues();
@@ -732,7 +696,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testIncorrectSchema() {
+ void testIncorrectSchema() {
final DataType accountType = RecordFieldType.RECORD.getRecordDataType(getAccountSchema());
final List<RecordField> fields = getDefaultFields();
fields.add(new RecordField("account", accountType));
@@ -756,7 +720,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testMergeOfSimilarRecords() throws Exception {
+ void testMergeOfSimilarRecords() throws Exception {
// GIVEN
String jsonPath = "src/test/resources/json/similar-records.json";
@@ -789,7 +753,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testChoiceOfEmbeddedSimilarRecords() throws Exception {
+ void testChoiceOfEmbeddedSimilarRecords() throws Exception {
// GIVEN
String jsonPath = "src/test/resources/json/choice-of-embedded-similar-records.json";
@@ -801,11 +765,11 @@ public class TestJsonTreeRowRecordReader {
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
- ))
+ RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)
+ ))
));
List<Object> expected = Arrays.asList(
@@ -829,12 +793,12 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
+ void testChoiceOfEmbeddedArraysAndSingleRecords() throws Exception {
// GIVEN
String jsonPath = "src/test/resources/json/choice-of-embedded-arrays-and-single-records.json";
- SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Arrays.asList(
- new RecordField("integer", RecordFieldType.INT.getDataType())
+ SimpleRecordSchema expectedRecordSchema1 = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("integer", RecordFieldType.INT.getDataType())
));
SimpleRecordSchema expectedRecordSchema2 = new SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -848,13 +812,13 @@ public class TestJsonTreeRowRecordReader {
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
- ))
+ RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+ ))
));
List<Object> expected = Arrays.asList(
@@ -901,7 +865,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
+ void testChoiceOfMergedEmbeddedArraysAndSingleRecords() throws Exception {
// GIVEN
String jsonPath = "src/test/resources/json/choice-of-merged-embedded-arrays-and-single-records.json";
@@ -922,13 +886,13 @@ public class TestJsonTreeRowRecordReader {
new RecordField("string", RecordFieldType.STRING.getDataType()),
new RecordField("boolean", RecordFieldType.BOOLEAN.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
- RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
- ))
+ RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema1),
+ RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema3),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema2)),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedRecordSchema4))
+ ))
));
List<Object> expected = Arrays.asList(
@@ -980,7 +944,7 @@ public class TestJsonTreeRowRecordReader {
}
@Test
- public void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
+ void testChoseSuboptimalSchemaWhenDataHasExtraFields() throws Exception {
// GIVEN
String jsonPath = "src/test/resources/json/choice-of-different-arrays-with-extra-fields.json";
@@ -993,18 +957,18 @@ public class TestJsonTreeRowRecordReader {
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema recordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
- ))
+ RecordSchema recordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema1)),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(recordSchema2))
+ ))
));
- RecordSchema schema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
- RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
- )
- )));
+ RecordSchema schema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("dataCollection", RecordFieldType.ARRAY.getArrayDataType(
+ RecordFieldType.RECORD.getRecordDataType(recordChoiceSchema)
+ )
+ )));
SimpleRecordSchema expectedChildSchema1 = new SimpleRecordSchema(Arrays.asList(
new RecordField("integer", RecordFieldType.INT.getDataType()),
@@ -1014,11 +978,11 @@ public class TestJsonTreeRowRecordReader {
new RecordField("integer", RecordFieldType.INT.getDataType()),
new RecordField("string", RecordFieldType.STRING.getDataType())
));
- RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Arrays.asList(
- new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
- RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
- ))
+ RecordSchema expectedRecordChoiceSchema = new SimpleRecordSchema(Collections.singletonList(
+ new RecordField("record", RecordFieldType.CHOICE.getChoiceDataType(
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema1)),
+ RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.RECORD.getRecordDataType(expectedChildSchema2))
+ ))
));
// Since the actual arrays have records with either (INT, BOOLEAN, STRING) or (INT, STRING, STRING)
@@ -1062,14 +1026,119 @@ public class TestJsonTreeRowRecordReader {
testReadRecords(jsonPath, schema, expected);
}
+ @Test
+ void testStartFromNestedArray() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/single-element-nested-array.json";
+
+ SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ List<Object> expected = Arrays.asList(
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", 42);
+ put("balance", 4750.89);
+ }}),
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", 43);
+ put("balance", 48212.38);
+ }})
+ );
+
+ testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accounts");
+ }
+
+ @Test
+ void testStartsFromNestedObject() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+ SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ List<Object> expected = Collections.singletonList(
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>() {{
+ put("id", 42);
+ put("balance", 4750.89);
+ }})
+ );
+
+ testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "account");
+ }
+
+ @Test
+ void testStartsFromMultipleNestedField() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/multiple-nested-field.json";
+
+ SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.STRING.getDataType()),
+ new RecordField("type", RecordFieldType.STRING.getDataType())
+ ));
+
+ List<Object> expected = Arrays.asList(
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", "n312kj3");
+ put("type", "employee");
+ }}),
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", "dl2kdff");
+ put("type", "security");
+ }})
+ );
+
+ testReadRecords(jsonPath, expected, StartingFieldStrategy.NESTED_FIELD, "accountIds");
+ }
+
+ @Test
+ void testStartFromSimpleFieldReturnsEmptyJson() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+ testReadRecords(jsonPath, Collections.emptyList(), StartingFieldStrategy.NESTED_FIELD, "name");
+ }
+
+ @Test
+ void testStartFromNonExistentFieldWithDefinedSchema() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/single-element-nested.json";
+
+ SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(getDefaultFields());
+ List<Object> expected = Collections.emptyList();
+
+ testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "notfound");
+ }
+
+ @Test
+ void testStartFromNestedFieldThenStartObject() throws IOException, MalformedRecordException {
+ String jsonPath = "src/test/resources/json/nested-array-then-start-object.json";
+
+ SimpleRecordSchema expectedRecordSchema = new SimpleRecordSchema(Arrays.asList(
+ new RecordField("id", RecordFieldType.INT.getDataType()),
+ new RecordField("balance", RecordFieldType.DOUBLE.getDataType())
+ ));
+
+ List<Object> expected = Arrays.asList(
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", 42);
+ put("balance", 4750.89);
+ }}),
+ new MapRecord(expectedRecordSchema, new HashMap<String, Object>(){{
+ put("id", 43);
+ put("balance", 48212.38);
+ }})
+ );
+
+ testReadRecords(jsonPath, expectedRecordSchema, expected, StartingFieldStrategy.NESTED_FIELD, "accounts");
+ }
+
private void testReadRecords(String jsonPath, List<Object> expected) throws IOException, MalformedRecordException {
// GIVEN
final File jsonFile = new File(jsonPath);
try (
- InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
+ InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))
) {
- RecordSchema schema = inferSchema(jsonStream);
+ RecordSchema schema = inferSchema(jsonStream, StartingFieldStrategy.ROOT_NODE, null);
// WHEN
// THEN
@@ -1077,25 +1146,30 @@ public class TestJsonTreeRowRecordReader {
}
}
- private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
- // GIVEN
+ private void testReadRecords(String jsonPath, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
final File jsonFile = new File(jsonPath);
+ try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
+ RecordSchema schema = inferSchema(jsonStream, strategy, startingFieldName);
+ testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
+ }
+ }
- try (
- InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile));
- ) {
- // WHEN
- // THEN
+ private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
+ final File jsonFile = new File(jsonPath);
+ try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
testReadRecords(jsonStream, schema, expected);
}
}
+ private void testReadRecords(String jsonPath, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName) throws IOException, MalformedRecordException {
+ final File jsonFile = new File(jsonPath);
+ try (InputStream jsonStream = new ByteArrayInputStream(FileUtils.readFileToByteArray(jsonFile))) {
+ testReadRecords(jsonStream, schema, expected, strategy, startingFieldName);
+ }
+ }
+
private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected) throws IOException, MalformedRecordException {
- // GIVEN
- try (
- JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat);
- ) {
- // WHEN
+ try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat)) {
List<Object> actual = new ArrayList<>();
Record record;
while ((record = reader.nextRecord()) != null) {
@@ -1103,7 +1177,6 @@ public class TestJsonTreeRowRecordReader {
actual.addAll(dataCollection);
}
- // THEN
List<Function<Object, Object>> propertyProviders = Arrays.asList(
_object -> ((Record)_object).getSchema(),
_object -> Arrays.stream(((Record)_object).getValues()).map(value -> {
@@ -1122,9 +1195,38 @@ public class TestJsonTreeRowRecordReader {
}
}
- private RecordSchema inferSchema(InputStream jsonStream) throws IOException {
+ private void testReadRecords(InputStream jsonStream, RecordSchema schema, List<Object> expected, StartingFieldStrategy strategy, String startingFieldName)
+ throws IOException, MalformedRecordException {
+ try (JsonTreeRowRecordReader reader = new JsonTreeRowRecordReader(jsonStream, mock(ComponentLog.class), schema, dateFormat, timeFormat, timestampFormat,
+ strategy, startingFieldName)) {
+ List<Object> actual = new ArrayList<>();
+ Record record;
+
+ while ((record = reader.nextRecord()) != null) {
+ actual.add(record);
+ }
+
+ List<Function<Object, Object>> propertyProviders = Arrays.asList(
+ _object -> ((Record)_object).getSchema(),
+ _object -> Arrays.stream(((Record)_object).getValues()).map(value -> {
+ if (value != null && value.getClass().isArray()) {
+ return Arrays.asList((Object[]) value);
+ } else {
+ return value;
+ }
+ }).collect(Collectors.toList())
+ );
+
+ List<EqualsWrapper<Object>> wrappedExpected = EqualsWrapper.wrapList(expected, propertyProviders);
+ List<EqualsWrapper<Object>> wrappedActual = EqualsWrapper.wrapList(actual, propertyProviders);
+
+ assertEquals(wrappedExpected, wrappedActual);
+ }
+ }
+
+ private RecordSchema inferSchema(InputStream jsonStream, StartingFieldStrategy strategy, String startingFieldName) throws IOException {
RecordSchema schema = new InferSchemaAccessStrategy<>(
- (__, inputStream) -> new JsonRecordSource(inputStream),
+ (__, inputStream) -> new JsonRecordSource(inputStream, strategy, startingFieldName),
new JsonSchemaInference(new TimeValueInference(null, null, null)),
mock(ComponentLog.class)
).getSchema(Collections.emptyMap(), jsonStream, null);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
index 465164b18a..84e06523cd 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/java/org/apache/nifi/json/TestWriteJsonResult.java
@@ -54,10 +54,10 @@ import java.util.Map;
import static org.junit.jupiter.api.Assertions.assertEquals;
-public class TestWriteJsonResult {
+class TestWriteJsonResult {
@Test
- public void testDataTypes() throws IOException, ParseException {
+ void testDataTypes() throws IOException, ParseException {
final List<RecordField> fields = new ArrayList<>();
for (final RecordFieldType fieldType : RecordFieldType.values()) {
if (fieldType == RecordFieldType.CHOICE) {
@@ -121,7 +121,7 @@ public class TestWriteJsonResult {
@Test
- public void testWriteSerializedForm() throws IOException {
+ void testWriteSerializedForm() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("age", RecordFieldType.INT.getDataType()));
@@ -160,7 +160,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testTimestampWithNullFormat() throws IOException {
+ void testTimestampWithNullFormat() throws IOException {
final Map<String, Object> values = new HashMap<>();
values.put("timestamp", new java.sql.Timestamp(37293723L));
values.put("time", new java.sql.Time(37293723L));
@@ -192,7 +192,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testExtraFieldInWriteRecord() throws IOException {
+ void testExtraFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -219,7 +219,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testExtraFieldInWriteRawRecord() throws IOException {
+ void testExtraFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
final RecordSchema schema = new SimpleRecordSchema(fields);
@@ -246,7 +246,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testMissingFieldInWriteRecord() throws IOException {
+ void testMissingFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -273,7 +273,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testMissingFieldInWriteRawRecord() throws IOException {
+ void testMissingFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -300,7 +300,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testMissingAndExtraFieldInWriteRecord() throws IOException {
+ void testMissingAndExtraFieldInWriteRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -328,7 +328,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
+ void testMissingAndExtraFieldInWriteRawRecord() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -356,7 +356,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testNullSuppression() throws IOException {
+ void testNullSuppression() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("id", RecordFieldType.STRING.getDataType()));
fields.add(new RecordField("name", RecordFieldType.STRING.getDataType()));
@@ -378,8 +378,8 @@ public class TestWriteJsonResult {
baos.reset();
try (
- final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
- NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
+ final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
+ NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
writer.beginRecordSet();
writer.write(recordWithMissingName);
writer.finishRecordSet();
@@ -390,7 +390,7 @@ public class TestWriteJsonResult {
baos.reset();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
- null)) {
+ null)) {
writer.beginRecordSet();
writer.write(recordWithMissingName);
writer.finishRecordSet();
@@ -414,8 +414,8 @@ public class TestWriteJsonResult {
baos.reset();
try (
- final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
- NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
+ final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
+ NullSuppression.ALWAYS_SUPPRESS, OutputGrouping.OUTPUT_ARRAY, null, null, null)) {
writer.beginRecordSet();
writer.write(recordWithNullValue);
writer.finishRecordSet();
@@ -426,7 +426,7 @@ public class TestWriteJsonResult {
baos.reset();
try (final WriteJsonResult writer = new WriteJsonResult(Mockito.mock(ComponentLog.class), schema, new SchemaNameAsAttribute(), baos, false,
NullSuppression.SUPPRESS_MISSING, OutputGrouping.OUTPUT_ARRAY, null, null,
- null)) {
+ null)) {
writer.beginRecordSet();
writer.write(recordWithNullValue);
writer.finishRecordSet();
@@ -437,7 +437,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testOnelineOutput() throws IOException {
+ void testOnelineOutput() throws IOException {
final Map<String, Object> values1 = new HashMap<>();
values1.put("timestamp", new java.sql.Timestamp(37293723L));
values1.put("time", new java.sql.Time(37293723L));
@@ -480,7 +480,7 @@ public class TestWriteJsonResult {
}
@Test
- public void testChoiceArray() throws IOException {
+ void testChoiceArray() throws IOException {
final List<RecordField> fields = new ArrayList<>();
fields.add(new RecordField("path", RecordFieldType.CHOICE.getChoiceDataType(RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.STRING.getDataType()))));
final RecordSchema schema = new SimpleRecordSchema(fields);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json
new file mode 100644
index 0000000000..7b2f47d28a
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/multiple-nested-field.json
@@ -0,0 +1,22 @@
+[
+ {
+ "id": 1,
+ "name": "John Doe",
+ "balance": 4750.89,
+ "address": "123 My Street",
+ "city": "My City",
+ "state": "MS",
+ "zipCode": "11111",
+ "country": "USA",
+ "accountIds": [
+ {
+ "id": "n312kj3",
+ "type": "employee"
+ },
+ {
+ "id": "dl2kdff",
+ "type": "security"
+ }
+ ]
+ }
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json
new file mode 100644
index 0000000000..662ebb8dc8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/nested-array-then-start-object.json
@@ -0,0 +1,20 @@
+[
+ {
+ "id": 17,
+ "name": "John",
+ "accounts": [
+ {
+ "id": 42,
+ "balance": 4750.89
+ },
+ {
+ "id": 43,
+ "balance": 48212.38
+ }
+ ]
+ },
+ {
+ "id": 98,
+ "balance": 67829.12
+ }
+]
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json
new file mode 100644
index 0000000000..ccc2121ef1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-record-serialization-services-bundle/nifi-record-serialization-services/src/test/resources/json/single-element-nested-array-middle.json
@@ -0,0 +1,16 @@
+{
+ "id": 1,
+ "accounts": [{
+ "id": 42,
+ "balance": 4750.89
+ }, {
+ "id": 43,
+ "balance": 48212.38
+ }],
+ "name": "John Doe",
+ "address": "123 My Street",
+ "city": "My City",
+ "state": "MS",
+ "zipCode": "11111",
+ "country": "USA"
+}
\ No newline at end of file