You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2016/11/18 02:06:42 UTC
nifi git commit: NIFI-3033 GenerateFlowFile Dynamic Properties
Repository: nifi
Updated Branches:
refs/heads/master 06d7ecd32 -> 7206318ec
NIFI-3033 GenerateFlowFile Dynamic Properties
This closes #1222.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7206318e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7206318e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7206318e
Branch: refs/heads/master
Commit: 7206318ecfb3ae973059bb850a2a7d87134bd0c4
Parents: 06d7ecd
Author: James Wing <jv...@gmail.com>
Authored: Mon Nov 14 13:32:31 2016 -0800
Committer: Koji Kawamura <ij...@apache.org>
Committed: Fri Nov 18 11:06:20 2016 +0900
----------------------------------------------------------------------
.../processors/standard/GenerateFlowFile.java | 33 +++++++++++++++++++-
.../standard/TestGenerateFlowFile.java | 18 +++++++++++
2 files changed, 50 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/7206318e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
index f3a51ef..b6f06cd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateFlowFile.java
@@ -21,12 +21,15 @@ import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -36,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -49,7 +53,11 @@ import org.apache.nifi.processor.util.StandardValidators;
@SupportsBatching
@Tags({"test", "random", "generate"})
@InputRequirement(Requirement.INPUT_FORBIDDEN)
-@CapabilityDescription("This processor creates FlowFiles of random data and is used for load testing")
+@CapabilityDescription("This processor creates FlowFiles with random data or custom content. GenerateFlowFile is useful" +
+ "for load testing, configuration, and simulation.")
+@DynamicProperty(name = "Generated FlowFile attribute name", value = "Generated FlowFile attribute value", supportsExpressionLanguage = true,
+ description = "Specifies an attribute on generated FlowFiles defined by the Dynamic Property's key and value." +
+ " If Expression Language is used, evaluation will be performed only once per batch of generated FlowFiles.")
public class GenerateFlowFile extends AbstractProcessor {
private final AtomicReference<byte[]> data = new AtomicReference<>();
@@ -127,6 +135,18 @@ public class GenerateFlowFile extends AbstractProcessor {
}
@Override
+ protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ return new PropertyDescriptor.Builder()
+ .name(propertyDescriptorName)
+ .required(false)
+ .addValidator(StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.STRING, true))
+ .addValidator(StandardValidators.ATTRIBUTE_KEY_PROPERTY_NAME_VALIDATOR)
+ .expressionLanguageSupported(true)
+ .dynamic(true)
+ .build();
+ }
+
+ @Override
public Set<Relationship> getRelationships() {
return relationships;
}
@@ -183,6 +203,16 @@ public class GenerateFlowFile extends AbstractProcessor {
data = this.data.get();
}
+ Map<PropertyDescriptor, String> processorProperties = context.getProperties();
+ Map<String, String> generatedAttributes = new HashMap<String, String>();
+ for (final Map.Entry<PropertyDescriptor, String> entry : processorProperties.entrySet()) {
+ PropertyDescriptor property = entry.getKey();
+ if (property.isDynamic() && property.isExpressionLanguageSupported()) {
+ String dynamicValue = context.getProperty(property).evaluateAttributeExpressions().getValue();
+ generatedAttributes.put(property.getName(), dynamicValue);
+ }
+ }
+
for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
FlowFile flowFile = session.create();
if (data.length > 0) {
@@ -193,6 +223,7 @@ public class GenerateFlowFile extends AbstractProcessor {
}
});
}
+ flowFile = session.putAllAttributes(flowFile, generatedAttributes);
session.getProvenanceReporter().create(flowFile);
session.transfer(flowFile, SUCCESS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/7206318e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java
index d930270..ca7de0c 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateFlowFile.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Test;
@@ -53,4 +54,21 @@ public class TestGenerateFlowFile {
runner.assertNotValid();
}
+ @Test
+ public void testDynamicPropertiesToAttributes() throws IOException {
+ TestRunner runner = TestRunners.newTestRunner(new GenerateFlowFile());
+ runner.setProperty(GenerateFlowFile.FILE_SIZE, "1B");
+ runner.setProperty(GenerateFlowFile.DATA_FORMAT, GenerateFlowFile.DATA_FORMAT_TEXT);
+ runner.setProperty("plain.dynamic.property", "Plain Value");
+ runner.setProperty("expression.dynamic.property", "${literal('Expression Value')}");
+ runner.assertValid();
+
+ runner.run();
+
+ runner.assertTransferCount(GenerateFlowFile.SUCCESS, 1);
+ MockFlowFile generatedFlowFile = runner.getFlowFilesForRelationship(GenerateFlowFile.SUCCESS).get(0);
+ generatedFlowFile.assertAttributeEquals("plain.dynamic.property", "Plain Value");
+ generatedFlowFile.assertAttributeEquals("expression.dynamic.property", "Expression Value");
+ }
+
}
\ No newline at end of file