You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2022/12/23 01:30:47 UTC
[nifi] branch main updated: NIFI-10585 Added GenerateRecord Processor
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f38f14b363 NIFI-10585 Added GenerateRecord Processor
f38f14b363 is described below
commit f38f14b363a955261d640aa214ae152a361be7c5
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Mon Oct 3 18:48:00 2022 -0400
NIFI-10585 Added GenerateRecord Processor
This closes #6480
Signed-off-by: David Handermann <ex...@apache.org>
---
.../nifi-standard-processors/pom.xml | 7 +
.../nifi/processors/standard/GenerateRecord.java | 415 +++++++++++++++++++++
.../standard/faker/FakerMethodHolder.java | 44 +++
.../nifi/processors/standard/faker/FakerUtils.java | 188 ++++++++++
.../services/org.apache.nifi.processor.Processor | 1 +
.../processors/standard/TestGenerateRecord.java | 273 ++++++++++++++
.../TestGenerateRecord/nested_no_nullable.avsc | 52 +++
.../TestGenerateRecord/nested_nullable.avsc | 52 +++
8 files changed, 1032 insertions(+)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index c5ad98209a..ae3b0dcd7d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -322,6 +322,11 @@
<groupId>org.slf4j</groupId>
<artifactId>jcl-over-slf4j</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.github.javafaker</groupId>
+ <artifactId>javafaker</artifactId>
+ <version>1.0.2</version>
+ </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-mock</artifactId>
@@ -514,6 +519,8 @@
<exclude>src/test/resources/ScanAttribute/dictionary1</exclude>
<exclude>src/test/resources/TestEncryptContent/text.txt</exclude>
<exclude>src/test/resources/TestEncryptContent/text.txt.asc</exclude>
+ <exclude>src/test/resources/TestGenerateRecord/nested_no_nullable.avsc</exclude>
+ <exclude>src/test/resources/TestGenerateRecord/nested_nullable.avsc</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.txt</exclude>
<exclude>src/test/resources/TestIdentifyMimeType/1.csv</exclude>
<exclude>src/test/resources/TestJson/json-sample.json</exclude>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
new file mode 100644
index 0000000000..08b7d51b9c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.github.javafaker.Faker;
+import org.apache.avro.Schema;
+import org.apache.nifi.annotation.behavior.DynamicProperties;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.avro.AvroSchemaValidator;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.schema.access.SchemaNotFoundException;
+import org.apache.nifi.serialization.RecordSetWriter;
+import org.apache.nifi.serialization.RecordSetWriterFactory;
+import org.apache.nifi.serialization.SimpleRecordSchema;
+import org.apache.nifi.serialization.WriteResult;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.MapRecord;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.RecordField;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.serialization.record.type.ArrayDataType;
+import org.apache.nifi.serialization.record.type.ChoiceDataType;
+import org.apache.nifi.serialization.record.type.DecimalDataType;
+import org.apache.nifi.serialization.record.type.EnumDataType;
+import org.apache.nifi.serialization.record.type.MapDataType;
+import org.apache.nifi.serialization.record.type.RecordDataType;
+import org.apache.nifi.util.StringUtils;
+
+import java.math.BigInteger;
+import java.time.LocalDate;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static org.apache.nifi.processors.standard.faker.FakerUtils.DEFAULT_DATE_PROPERTY_NAME;
+
+@SupportsBatching
+@Tags({"test", "random", "generate", "fake"})
+@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
+@WritesAttributes({
+ @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer"),
+ @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile"),
+})
+@CapabilityDescription("This processor creates FlowFiles with records having random value for the specified fields. GenerateRecord is useful " +
+ "for testing, configuration, and simulation. It uses either user-defined properties to define a record schema or a provided schema and generates the specified number of records using " +
+ "random data for the fields in the schema.")
+@DynamicProperties({
+ @DynamicProperty(
+ name = "Field name in generated record",
+ value = "Faker category for generated record values",
+ description = "Custom properties define the generated record schema using configured field names and value data types in absence of the Schema Text property"
+ )
+})
+public class GenerateRecord extends AbstractProcessor {
+
+ private static final AllowableValue[] fakerDatatypeValues = FakerUtils.createFakerPropertyList();
+
+ // Fake keys when generating a map
+ private static final String KEY1 = "key1";
+ private static final String KEY2 = "key2";
+ private static final String KEY3 = "key3";
+ private static final String KEY4 = "key4";
+
+ static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder()
+ .name("record-writer")
+ .displayName("Record Writer")
+ .description("Specifies the Controller Service to use for writing out the records")
+ .identifiesControllerService(RecordSetWriterFactory.class)
+ .required(true)
+ .build();
+
+ static final PropertyDescriptor NUM_RECORDS = new PropertyDescriptor.Builder()
+ .name("number-of-records")
+ .displayName("Number of Records")
+ .description("Specifies how many records will be generated for each outgoing FlowFile.")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .defaultValue("100")
+ .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ .build();
+
+ static final PropertyDescriptor NULLABLE_FIELDS = new PropertyDescriptor.Builder()
+ .name("nullable-fields")
+ .displayName("Nullable Fields")
+ .description("Whether the generated fields will be nullable. Note that this property is ignored if Schema Text is set. Also it only affects the schema of the generated data, " +
+ "not whether any values will be null. If this property is true, see 'Null Value Percentage' to set the probability that any generated field will be null.")
+ .allowableValues("true", "false")
+ .defaultValue("true")
+ .required(true)
+ .build();
+ static final PropertyDescriptor NULL_PERCENTAGE = new PropertyDescriptor.Builder()
+ .name("null-percentage")
+ .displayName("Null Value Percentage")
+ .description("The percent probability (0-100%) that a generated value for any nullable field will be null. Set this property to zero to have no null values, or 100 to have all " +
+ "null values.")
+ .addValidator(StandardValidators.createLongValidator(0L, 100L, true))
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(true)
+ .defaultValue("0")
+ .dependsOn(NULLABLE_FIELDS, "true")
+ .build();
+
+ static final PropertyDescriptor SCHEMA_TEXT = new PropertyDescriptor.Builder()
+ .name("schema-text")
+ .displayName("Schema Text")
+ .description("The text of an Avro-formatted Schema used to generate record data. If this property is set, any user-defined properties are ignored.")
+ .addValidator(new AvroSchemaValidator())
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .required(false)
+ .build();
+
+
+ static final Relationship REL_SUCCESS = new Relationship.Builder()
+ .name("success")
+ .description("FlowFiles that are successfully created will be routed to this relationship")
+ .build();
+
+ static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
+
+ private volatile Faker faker = new Faker();
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(RECORD_WRITER);
+ properties.add(NUM_RECORDS);
+ properties.add(NULLABLE_FIELDS);
+ properties.add(NULL_PERCENTAGE);
+ properties.add(SCHEMA_TEXT);
+ return properties;
+ }
+
+ @Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues(fakerDatatypeValues)
+ .defaultValue("Address.fullAddress")
+ .required(false)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
+ public Set<Relationship> getRelationships() {
+ return RELATIONSHIPS;
+ }
+
+ @OnScheduled
+ public void onScheduled(final ProcessContext context) {
+ // Force the en-US Locale for more predictable results
+ faker = new Faker(Locale.US);
+ }
+
+ @Override
+ public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
+
+ final String schemaText = context.getProperty(SCHEMA_TEXT).evaluateAttributeExpressions().getValue();
+ final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
+ final int numRecords = context.getProperty(NUM_RECORDS).evaluateAttributeExpressions().asInteger();
+ FlowFile flowFile = session.create();
+ final Map<String, String> attributes = new HashMap<>();
+ final AtomicInteger recordCount = new AtomicInteger();
+
+ try {
+ flowFile = session.write(flowFile, out -> {
+ final RecordSchema recordSchema;
+ final boolean usingSchema;
+ final int nullPercentage = context.getProperty(NULL_PERCENTAGE).evaluateAttributeExpressions().asInteger();
+ if (StringUtils.isNotEmpty(schemaText)) {
+ final Schema avroSchema = new Schema.Parser().parse(schemaText);
+ recordSchema = AvroTypeUtil.createSchema(avroSchema);
+ usingSchema = true;
+ } else {
+ // Generate RecordSchema from user-defined properties
+ final boolean nullable = context.getProperty(NULLABLE_FIELDS).asBoolean();
+ final Map<String, String> fields = getFields(context);
+ recordSchema = generateRecordSchema(fields, nullable);
+ usingSchema = false;
+ }
+ try {
+ final RecordSchema writeSchema = writerFactory.getSchema(attributes, recordSchema);
+ try (final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, attributes)) {
+ writer.beginRecordSet();
+
+ Record record;
+ List<RecordField> writeFieldNames = writeSchema.getFields();
+ Map<String, Object> recordEntries = new HashMap<>();
+ for (int i = 0; i < numRecords; i++) {
+ for (RecordField writeRecordField : writeFieldNames) {
+ final String writeFieldName = writeRecordField.getFieldName();
+ final Object writeFieldValue;
+ if (usingSchema) {
+ writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
+ } else {
+ final boolean nullValue;
+ if (!context.getProperty(GenerateRecord.NULLABLE_FIELDS).asBoolean() || nullPercentage == 0) {
+ nullValue = false;
+ } else {
+ nullValue = (faker.number().numberBetween(0, 100) <= nullPercentage);
+ }
+ if (nullValue) {
+ writeFieldValue = null;
+ } else {
+ final String propertyValue = context.getProperty(writeFieldName).getValue();
+ writeFieldValue = FakerUtils.getFakeData(propertyValue, faker);
+ }
+ }
+
+ recordEntries.put(writeFieldName, writeFieldValue);
+ }
+ record = new MapRecord(recordSchema, recordEntries);
+ writer.write(record);
+ }
+
+ final WriteResult writeResult = writer.finishRecordSet();
+ attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
+ attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+ attributes.putAll(writeResult.getAttributes());
+ recordCount.set(writeResult.getRecordCount());
+ }
+ } catch (final SchemaNotFoundException e) {
+ throw new ProcessException("Schema not found while writing records", e);
+ }
+ });
+ } catch (final Exception e) {
+ if (e instanceof ProcessException) {
+ throw e;
+ } else {
+ throw new ProcessException("Record generation failed", e);
+ }
+ }
+
+ flowFile = session.putAllAttributes(flowFile, attributes);
+ session.transfer(flowFile, REL_SUCCESS);
+
+ final int count = recordCount.get();
+ session.adjustCounter("Records Processed", count, false);
+
+ getLogger().info("Generated records [{}] for {}", count, flowFile);
+ }
+
+ protected Map<String, String> getFields(ProcessContext context) {
+ return context.getProperties().entrySet().stream()
+ // filter non-null dynamic properties
+ .filter(e -> e.getKey().isDynamic() && e.getValue() != null)
+ // convert to Map of user-defined field names and types
+ .collect(Collectors.toMap(
+ e -> e.getKey().getName(),
+ e -> context.getProperty(e.getKey()).getValue()
+ ));
+ }
+
+ private Object generateValueFromRecordField(RecordField recordField, Faker faker, int nullPercentage) {
+ if (recordField.isNullable() && faker.number().numberBetween(0, 100) <= nullPercentage) {
+ return null;
+ }
+ switch (recordField.getDataType().getFieldType()) {
+ case BIGINT:
+ return new BigInteger(String.valueOf(faker.number().numberBetween(Long.MIN_VALUE, Long.MAX_VALUE)));
+ case BOOLEAN:
+ return FakerUtils.getFakeData("Bool.bool", faker);
+ case BYTE:
+ return faker.number().numberBetween(Byte.MIN_VALUE, Byte.MAX_VALUE);
+ case CHAR:
+ return (char) faker.number().numberBetween(Character.MIN_VALUE, Character.MAX_VALUE);
+ case DATE:
+ return FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker);
+ case DECIMAL:
+ case DOUBLE:
+ case FLOAT:
+ return faker.number().randomDouble(((DecimalDataType) recordField.getDataType()).getScale(), Long.MIN_VALUE, Long.MAX_VALUE);
+ case INT:
+ return faker.number().numberBetween(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ case LONG:
+ return faker.number().numberBetween(Long.MIN_VALUE, Long.MAX_VALUE);
+ case SHORT:
+ return faker.number().numberBetween(Short.MIN_VALUE, Short.MAX_VALUE);
+ case ENUM:
+ List<String> enums = ((EnumDataType) recordField.getDataType()).getEnums();
+ return enums.get(faker.number().numberBetween(0, enums.size() - 1));
+ case TIME:
+ Date fakeDate = (Date) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker);
+ LocalDate fakeLocalDate = fakeDate.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
+ return fakeLocalDate.format(DateTimeFormatter.ISO_LOCAL_TIME);
+ case TIMESTAMP:
+ return ((Date) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker)).getTime();
+ case UUID:
+ return UUID.randomUUID();
+ case ARRAY:
+ final ArrayDataType arrayDataType = (ArrayDataType) recordField.getDataType();
+ final DataType elementType = arrayDataType.getElementType();
+ final int numElements = faker.number().numberBetween(0, 10);
+ Object[] returnValue = new Object[numElements];
+ for (int i = 0; i < numElements; i++) {
+ RecordField tempRecordField = new RecordField(recordField.getFieldName() + "[" + i + "]", elementType, arrayDataType.isElementsNullable());
+ // If the array elements are non-nullable, use zero as the nullPercentage
+ returnValue[i] = generateValueFromRecordField(tempRecordField, faker, arrayDataType.isElementsNullable() ? nullPercentage : 0);
+ }
+ return returnValue;
+ case MAP:
+ final MapDataType mapDataType = (MapDataType) recordField.getDataType();
+ final DataType valueType = mapDataType.getValueType();
+ // Create 4-element fake map
+ Map<String, Object> returnMap = new HashMap<>(4);
+ returnMap.put(KEY1, generateValueFromRecordField(new RecordField(KEY1, valueType), faker, nullPercentage));
+ returnMap.put(KEY2, generateValueFromRecordField(new RecordField(KEY2, valueType), faker, nullPercentage));
+ returnMap.put(KEY3, generateValueFromRecordField(new RecordField(KEY3, valueType), faker, nullPercentage));
+ returnMap.put(KEY4, generateValueFromRecordField(new RecordField(KEY4, valueType), faker, nullPercentage));
+ return returnMap;
+ case RECORD:
+ final RecordDataType recordType = (RecordDataType) recordField.getDataType();
+ final RecordSchema childSchema = recordType.getChildSchema();
+ final Map<String, Object> recordValues = new HashMap<>();
+ for (RecordField writeRecordField : childSchema.getFields()) {
+ final String writeFieldName = writeRecordField.getFieldName();
+ final Object writeFieldValue = generateValueFromRecordField(writeRecordField, faker, nullPercentage);
+ recordValues.put(writeFieldName, writeFieldValue);
+ }
+ return new MapRecord(childSchema, recordValues);
+ case CHOICE:
+ final ChoiceDataType choiceDataType = (ChoiceDataType) recordField.getDataType();
+ List<DataType> subTypes = choiceDataType.getPossibleSubTypes();
+ // Pick one at random and generate a value for it
+ DataType chosenType = subTypes.get(faker.number().numberBetween(0, subTypes.size() - 1));
+ RecordField tempRecordField = new RecordField(recordField.getFieldName(), chosenType);
+ return generateValueFromRecordField(tempRecordField, faker, nullPercentage);
+ case STRING:
+ default:
+ return generateRandomString();
+ }
+ }
+
+ private String generateRandomString() {
+ final int categoryChoice = faker.number().numberBetween(0, 10);
+ switch (categoryChoice) {
+ case 0:
+ return faker.name().fullName();
+ case 1:
+ return faker.lorem().word();
+ case 2:
+ return faker.shakespeare().romeoAndJulietQuote();
+ case 3:
+ return faker.educator().university();
+ case 4:
+ return faker.zelda().game();
+ case 5:
+ return faker.company().name();
+ case 6:
+ return faker.chuckNorris().fact();
+ case 7:
+ return faker.book().title();
+ case 8:
+ return faker.dog().breed();
+ default:
+ return faker.animal().name();
+ }
+ }
+
+ protected RecordSchema generateRecordSchema(final Map<String, String> fields, final boolean nullable) {
+ final List<RecordField> recordFields = new ArrayList<>(fields.size());
+ for (Map.Entry<String, String> field : fields.entrySet()) {
+ final String fieldName = field.getKey();
+ final String fieldType = field.getValue();
+ final DataType fieldDataType = FakerUtils.getDataType(fieldType);
+ RecordField recordField = new RecordField(fieldName, fieldDataType, nullable);
+ recordFields.add(recordField);
+ }
+ return new SimpleRecordSchema(recordFields);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerMethodHolder.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerMethodHolder.java
new file mode 100644
index 0000000000..475783f6c3
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerMethodHolder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.faker;
+
+import java.lang.reflect.Method;
+
+// This class holds references to objects in order to programmatically make calls to Faker objects to generate random data
+public class FakerMethodHolder {
+ private final String propertyName;
+ private final Object methodObject;
+ private final Method method;
+
+ public FakerMethodHolder(final String propertyName, final Object methodObject, final Method method) {
+ this.propertyName = propertyName;
+ this.methodObject = methodObject;
+ this.method = method;
+ }
+
+ public String getPropertyName() {
+ return propertyName;
+ }
+
+ public Object getMethodObject() {
+ return methodObject;
+ }
+
+ public Method getMethod() {
+ return method;
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerUtils.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerUtils.java
new file mode 100644
index 0000000000..1019b497fc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/faker/FakerUtils.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard.faker;
+
+import com.github.javafaker.Faker;
+import com.github.javafaker.service.files.EnFile;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.serialization.record.DataType;
+import org.apache.nifi.serialization.record.RecordFieldType;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+public class FakerUtils {
+
+ public static final String DEFAULT_DATE_PROPERTY_NAME = "DateAndTime.pastDate";
+ private static final int RANDOM_DATE_DAYS = 365;
+ private static final Map<String, FakerMethodHolder> datatypeFunctionMap = new LinkedHashMap<>();
+
+
+ // Additional Faker datatypes that don't use predetermined data files (i.e. they generate data or have non-String types)
+ static final AllowableValue FT_BOOL = new AllowableValue("Boolean.bool", "Boolean - bool (true/false)", "A value of 'true' or 'false'");
+ static final AllowableValue FT_FUTURE_DATE = new AllowableValue("DateAndTime.futureDate", "Date And Time - Future Date", "Generates a date up to one year in the " +
+ "future from the time the processor is executed");
+ static final AllowableValue FT_PAST_DATE = new AllowableValue(DEFAULT_DATE_PROPERTY_NAME, "Date And Time - Past Date", "Generates a date up to one year in the past from the time the " +
+ "processor is executed");
+ static final AllowableValue FT_BIRTHDAY = new AllowableValue("DateAndTime.birthday", "Date And Time - Birthday", "Generates a random birthday between 65 and 18 years ago");
+ static final AllowableValue FT_NUMBER = new AllowableValue("Number.Integer", "Number - Integer", "A integer number");
+ static final AllowableValue FT_SHA256 = new AllowableValue("Crypto.SHA-256", "Crypto - SHA-256", "A SHA-256 hash");
+ static final AllowableValue FT_SHA512 = new AllowableValue("Crypto.SHA-512", "Crypto - SHA-512", "A SHA-512 hash");
+
+ private static final String PACKAGE_PREFIX = "com.github.javafaker";
+
+ public static AllowableValue[] createFakerPropertyList() {
+ final List<EnFile> fakerFiles = EnFile.getFiles();
+ final Map<String, Class<?>> possibleFakerTypeMap = new HashMap<>(fakerFiles.size());
+ for (EnFile fakerFile : fakerFiles) {
+ String className = normalizeClassName(fakerFile.getFile().substring(0, fakerFile.getFile().indexOf('.')));
+ try {
+ possibleFakerTypeMap.put(className, Class.forName(PACKAGE_PREFIX + '.' + className));
+ } catch (Exception e) {
+ // Ignore, these are the ones we want to filter out
+ }
+ }
+
+ // Filter on no-arg methods that return a String, these should be the methods the user can use to generate data
+ Faker faker = new Faker();
+ List<AllowableValue> supportedDataTypes = new ArrayList<>();
+ for (Map.Entry<String, Class<?>> entry : possibleFakerTypeMap.entrySet()) {
+ List<Method> fakerMethods = Arrays.stream(entry.getValue().getDeclaredMethods()).filter((method) ->
+ Modifier.isPublic(method.getModifiers())
+ && method.getParameterCount() == 0
+ && method.getReturnType() == String.class)
+ .collect(Collectors.toList());
+ try {
+ final Object methodObject = faker.getClass().getDeclaredMethod(normalizeMethodName(entry.getKey())).invoke(faker);
+ for (Method method : fakerMethods) {
+ final String allowableValueName = normalizeClassName(entry.getKey()) + "." + method.getName();
+ final String allowableValueDisplayName = normalizeDisplayName(entry.getKey()) + " - " + normalizeDisplayName(method.getName());
+ datatypeFunctionMap.put(allowableValueName, new FakerMethodHolder(allowableValueName, methodObject, method));
+ supportedDataTypes.add(new AllowableValue(allowableValueName, allowableValueDisplayName, allowableValueDisplayName));
+ }
+ } catch (Exception e) {
+ // Ignore, this should indicate a Faker method that we're not interested in
+ }
+ }
+
+ // Add types manually for those Faker methods that generate data rather than getting it from a resource file
+ supportedDataTypes.add(FT_FUTURE_DATE);
+ supportedDataTypes.add(FT_PAST_DATE);
+ supportedDataTypes.add(FT_BIRTHDAY);
+ supportedDataTypes.add(FT_NUMBER);
+ supportedDataTypes.add(FT_SHA256);
+ supportedDataTypes.add(FT_SHA512);
+ supportedDataTypes.sort(Comparator.comparing(AllowableValue::getDisplayName));
+
+ return supportedDataTypes.toArray(new AllowableValue[]{});
+ }
+
+ public static Object getFakeData(String type, Faker faker) {
+
+ // Handle Number method not discovered by programmatically getting methods from the Faker objects
+ if (FT_NUMBER.getValue().equals(type)) {
+ return faker.number().numberBetween(Integer.MIN_VALUE, Integer.MAX_VALUE);
+ }
+
+ // Handle DateAndTime methods not discovered by programmatically getting methods from the Faker objects
+ if (FT_FUTURE_DATE.getValue().equals(type)) {
+ return faker.date().future(RANDOM_DATE_DAYS, TimeUnit.DAYS);
+ }
+ if (FT_PAST_DATE.getValue().equals(type)) {
+ return faker.date().past(RANDOM_DATE_DAYS, TimeUnit.DAYS);
+ }
+ if (FT_BIRTHDAY.getValue().equals(type)) {
+ return faker.date().birthday();
+ }
+
+ // Handle Crypto methods not discovered by programmatically getting methods from the Faker objects
+ if (FT_SHA256.getValue().equals(type)) {
+ return faker.crypto().sha256();
+ }
+ if (FT_SHA512.getValue().equals(type)) {
+ return faker.crypto().sha512();
+ }
+
+ // If not a special circumstance, use the map to call the associated Faker method and return the value
+ try {
+ final FakerMethodHolder fakerMethodHolder = datatypeFunctionMap.get(type);
+ Object returnObject = fakerMethodHolder.getMethod().invoke(fakerMethodHolder.getMethodObject());
+ return returnObject;
+ } catch (InvocationTargetException | IllegalAccessException e) {
+ throw new ProcessException(type + " is not a valid value", e);
+ }
+ }
+
+ // This method overrides the default String type for certain Faker datatypes for more user-friendly values
+ public static DataType getDataType(final String type) {
+
+ if (FT_FUTURE_DATE.getValue().equals(type)
+ || FT_PAST_DATE.getValue().equals(type)
+ || FT_BIRTHDAY.getValue().equals(type)
+ ) {
+ return RecordFieldType.DATE.getDataType();
+ }
+ if (FT_NUMBER.getValue().equals(type)) {
+ return RecordFieldType.INT.getDataType();
+ }
+ if (FT_BOOL.getValue().equals(type)) {
+ return RecordFieldType.BOOLEAN.getDataType();
+ }
+ return RecordFieldType.STRING.getDataType();
+ }
+
+ public static Map<String, FakerMethodHolder> getDatatypeFunctionMap() {
+ return datatypeFunctionMap;
+ }
+
+ // This method identifies "segments" by splitting the given name on underscores, then capitalizes each segment and removes the underscores. Ex: 'game_of_thrones' = 'GameOfThrones'
+ private static String normalizeClassName(String name) {
+ String[] segments = name.split("_");
+ String newName = Arrays.stream(segments)
+ .map(s -> s.substring(0, 1).toUpperCase() + s.substring(1))
+ .collect(Collectors.joining());
+ return newName;
+ }
+
+ // This method lowercases the first letter of the given name in order to match the name to a Faker method
+ private static String normalizeMethodName(String name) {
+
+ String newName = name.substring(0, 1).toLowerCase() + name.substring(1);
+ return newName;
+ }
+
+ // This method splits the given name on uppercase letters, ensures the first letter is capitalized, then joins the segments using a space. Ex. 'gameOfThrones' = 'Game Of Thrones'
+ private static String normalizeDisplayName(String name) {
+ // Split when the next letter is uppercase
+ String[] upperCaseSegments = name.split("(?=\\p{Upper})");
+
+ return Arrays.stream(upperCaseSegments).map(
+ upperCaseSegment -> upperCaseSegment.substring(0, 1).toUpperCase() + upperCaseSegment.substring(1))
+ .collect(Collectors.joining(" "));
+ }
+}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 6a336d141a..e0db844bb5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -47,6 +47,7 @@ org.apache.nifi.processors.standard.FetchSFTP
org.apache.nifi.processors.standard.FlattenJson
org.apache.nifi.processors.standard.ForkRecord
org.apache.nifi.processors.standard.ForkEnrichment
+org.apache.nifi.processors.standard.GenerateRecord
org.apache.nifi.processors.standard.GenerateFlowFile
org.apache.nifi.processors.standard.GenerateTableFetch
org.apache.nifi.processors.standard.GetFile
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
new file mode 100644
index 0000000000..6b8df67de2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonParser;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ArrayNode;
+import org.apache.avro.Schema;
+import org.apache.nifi.avro.AvroTypeUtil;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.json.JsonRecordSetWriter;
+import org.apache.nifi.processors.standard.faker.FakerMethodHolder;
+import org.apache.nifi.processors.standard.faker.FakerUtils;
+import org.apache.nifi.serialization.record.MockRecordWriter;
+import org.apache.nifi.serialization.record.RecordSchema;
+import org.apache.nifi.util.MockFlowFile;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+
+public class TestGenerateRecord {
+
+ private TestRunner testRunner;
+ private GenerateRecord processor;
+
+ @BeforeEach
+ public void setup() {
+ processor = new GenerateRecord();
+ testRunner = TestRunners.newTestRunner(processor);
+ }
+
+ @Test
+ public void testGenerateNoNullableFields() throws Exception {
+
+ // Set all Faker properties
+ for (Map.Entry<String, FakerMethodHolder> fakerProperty : FakerUtils.getDatatypeFunctionMap().entrySet()) {
+ testRunner.setProperty(fakerProperty.getKey(), fakerProperty.getKey());
+ }
+
+ final Map<String, String> recordFields = processor.getFields(testRunner.getProcessContext());
+ final RecordSchema outputSchema = processor.generateRecordSchema(recordFields, false);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "false");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100"); // This should be ignored
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ final String output = flowFile.getContent();
+ for (String line : output.split(System.lineSeparator())) {
+ // A null value would not be output so a comma would be the last character on the line
+ if (line.endsWith(",")) {
+ fail(line + "should not end with a value");
+ }
+ }
+ }
+
+ @Test
+ public void testGenerateNullableFieldsZeroNullPercentage() throws Exception {
+ // Set all Faker properties
+ for (Map.Entry<String, FakerMethodHolder> fakerProperty : FakerUtils.getDatatypeFunctionMap().entrySet()) {
+ testRunner.setProperty(fakerProperty.getKey(), fakerProperty.getKey());
+ }
+
+ final Map<String, String> recordFields = processor.getFields(testRunner.getProcessContext());
+ final RecordSchema outputSchema = processor.generateRecordSchema(recordFields, true);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ final String output = flowFile.getContent();
+ for (String line : output.split(System.lineSeparator())) {
+ // A null value would not be output so a comma would be the last character on the line
+ if (line.endsWith(",")) {
+ fail(line + "should not end with a value");
+ }
+ }
+ }
+
+ @Test
+ public void testGenerateNullableFieldsOneHundredNullPercentage() throws Exception {
+ // Set all Faker properties
+ for (Map.Entry<String, FakerMethodHolder> fakerProperty : FakerUtils.getDatatypeFunctionMap().entrySet()) {
+ testRunner.setProperty(fakerProperty.getKey(), fakerProperty.getKey());
+ }
+
+ final Map<String, String> recordFields = processor.getFields(testRunner.getProcessContext());
+ final RecordSchema outputSchema = processor.generateRecordSchema(recordFields, true);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ // null values should cause all fields to be empty in the output
+ // create a string of commas whose number equals the number of fields in the datatypeFunctionMap (size - 1 copies)
+ flowFile.assertContentEquals(String.join("", Collections.nCopies(FakerUtils.getDatatypeFunctionMap().size() - 1, ",")) + "\n");
+ }
+
+ // Tests that the remaining fields are supported by the processor.
+ @Test
+ public void testFieldsReturnValue() throws Exception {
+
+ List<Field> fieldTypeFields = Arrays.stream(GenerateRecord.class.getFields()).filter((field) -> field.getName().startsWith("FT_")).collect(Collectors.toList());
+ for (Field field : fieldTypeFields) {
+ testRunner.setProperty(field.getName().toLowerCase(Locale.ROOT), ((AllowableValue) field.get(processor)).getValue());
+ }
+
+ final Map<String, String> recordFields = processor.getFields(testRunner.getProcessContext());
+ final RecordSchema outputSchema = processor.generateRecordSchema(recordFields, true);
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true");
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ }
+
+ @Test
+ public void testGenerateNoNullableFieldsSchemaText() throws Exception {
+
+ String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_no_nullable.avsc")));
+ final Schema avroSchema = new Schema.Parser().parse(schemaText);
+ final RecordSchema outputSchema = AvroTypeUtil.createSchema(avroSchema);
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "true"); // Should be ignored
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ final String output = flowFile.getContent();
+ for (String line : output.split(System.lineSeparator())) {
+ // A null value would not be output so a comma would be the last character on the line
+ if (line.contains(",,")) {
+ fail(line + "should not contain null values");
+ }
+ }
+ }
+
+ @Test
+ public void testGenerateNullableFieldsZeroNullPercentageSchemaText() throws Exception {
+ String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
+ final JsonRecordSetWriter recordWriter = new JsonRecordSetWriter();
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "false"); // Should be ignored
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "0");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "3");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ final String output = flowFile.getContent();
+ final JsonFactory jsonFactory = new JsonFactory();
+ try (JsonParser jsonParser = jsonFactory.createParser(output)) {
+ jsonParser.setCodec(new ObjectMapper());
+ JsonNode recordArray = jsonParser.readValueAsTree();
+ assertTrue(recordArray instanceof ArrayNode);
+ JsonNode recordNode = recordArray.get(0);
+ JsonNode systemNode = recordNode.get("System");
+ assertNotNull(systemNode);
+ JsonNode providerNode = systemNode.get("Provider");
+ assertNotNull(providerNode);
+ JsonNode guidNode = providerNode.get("Guid");
+ assertNotNull(guidNode);
+ assertNotNull(guidNode.asText());
+ JsonNode nameNode = providerNode.get("Name");
+ assertNotNull(nameNode);
+ assertNotNull(nameNode.asText());
+ JsonNode eventIdNode = systemNode.get("EventID");
+ assertNotNull(eventIdNode);
+ eventIdNode.asInt(); // This would throw a NullPointerException if the value was null
+ JsonNode eventDataNode = recordNode.get("EventData");
+ assertNotNull(eventDataNode);
+ JsonNode dataNode = eventDataNode.get("Data");
+ assertNotNull(dataNode);
+ assertTrue(dataNode instanceof ArrayNode);
+ assertTrue(dataNode.size() <= 10 && dataNode.size() >= 0);
+ for (int i = 0; i < dataNode.size(); i++) {
+ JsonNode dataElementNode = dataNode.get(i);
+ assertNotNull(dataElementNode);
+ JsonNode dataElementNameNode = dataElementNode.get("Name");
+ assertNotNull(dataElementNameNode);
+ assertNotNull(dataElementNameNode.asText());
+ JsonNode dataElementDataNode = dataElementNode.get("DataElement");
+ assertNotNull(dataElementDataNode);
+ assertNotNull(dataElementDataNode.asText());
+ }
+ }
+ }
+
+ @Test
+ public void testGenerateNullableFieldsOneHundredNullPercentageSchemaText() throws Exception {
+ String schemaText = new String(Files.readAllBytes(Paths.get("src/test/resources/TestGenerateRecord/nested_nullable.avsc")));
+ final Schema avroSchema = new Schema.Parser().parse(schemaText);
+ final RecordSchema outputSchema = AvroTypeUtil.createSchema(avroSchema);
+
+ final MockRecordWriter recordWriter = new MockRecordWriter(null, true, -1, false, outputSchema);
+ testRunner.addControllerService("record-writer", recordWriter);
+ testRunner.enableControllerService(recordWriter);
+ testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer");
+ testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText);
+ testRunner.setProperty(GenerateRecord.NULLABLE_FIELDS, "false"); // Should be ignored
+ testRunner.setProperty(GenerateRecord.NULL_PERCENTAGE, "100");
+ testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1");
+
+ testRunner.run();
+ testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1);
+ MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).get(0);
+ // null values should cause all fields to be empty in the output (2 top-level record fields in this case
+ flowFile.assertContentEquals(",\n");
+ }
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_no_nullable.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_no_nullable.avsc
new file mode 100644
index 0000000000..d39c736fdf
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_no_nullable.avsc
@@ -0,0 +1,52 @@
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [{
+ "name": "System",
+ "type": {
+ "type": "record",
+ "name": "SystemType",
+ "fields": [{
+ "name": "Provider",
+ "type": {
+ "type": "record",
+ "name": "ProviderType",
+ "fields": [{
+ "name": "Guid",
+ "type": "string"
+ }, {
+ "name": "Name",
+ "type": "string"
+ }]
+ }
+ }, {
+ "name": "EventID",
+ "type": "int"
+ }]
+ }
+ },{
+ "name": "EventData",
+ "type": {
+ "type": "record",
+ "name": "EventDataType",
+ "fields": [{
+ "name": "Data",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "DataElementRecord",
+ "fields": [{
+ "name": "Name",
+ "type": "string"
+ }, {
+ "name": "DataElement",
+ "type": "string"
+ }]
+ }
+ }
+ }]
+ }
+ }]
+}
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_nullable.avsc b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_nullable.avsc
new file mode 100644
index 0000000000..f7a07ad1a0
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestGenerateRecord/nested_nullable.avsc
@@ -0,0 +1,52 @@
+{
+ "type": "record",
+ "name": "nifiRecord",
+ "namespace": "org.apache.nifi",
+ "fields": [{
+ "name": "System",
+ "type": ["null", {
+ "type": "record",
+ "name": "SystemType",
+ "fields": [{
+ "name": "Provider",
+ "type": ["null", {
+ "type": "record",
+ "name": "ProviderType",
+ "fields": [{
+ "name": "Guid",
+ "type": ["null", "string"]
+ }, {
+ "name": "Name",
+ "type": ["null", "string"]
+ }]
+ }]
+ }, {
+ "name": "EventID",
+ "type": ["null", "int"]
+ }]
+ }]
+ },{
+ "name": "EventData",
+ "type": ["null", {
+ "type": "record",
+ "name": "EventDataType",
+ "fields": [{
+ "name": "Data",
+ "type": [{
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "DataElementRecord",
+ "fields": [{
+ "name": "Name",
+ "type": ["null", "string"]
+ }, {
+ "name": "DataElement",
+ "type": ["null", "string"]
+ }]
+ }
+ }, "null"]
+ }]
+ }]
+ }]
+}
\ No newline at end of file