You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/01/14 21:15:14 UTC
[nifi] branch master updated: NIFI-6919: Added relationship
attribute to DistributeLoad
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 421bfdd NIFI-6919: Added relationship attribute to DistributeLoad
421bfdd is described below
commit 421bfdd5fff87e477a91450f8414d7a989bd083f
Author: Michael Hogue <ho...@gmail.com>
AuthorDate: Thu Dec 12 21:06:15 2019 -0500
NIFI-6919: Added relationship attribute to DistributeLoad
NIFI-6919: Cleaned up docs
NIFI-6919: Cleanup
NIFI-6919: Cleanup
NIFI-6919: added negative unit test
NIFI-6919: Removed unnecesary feature flag
Updated attribute description
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3939
---
.../nifi/processors/standard/DistributeLoad.java | 42 ++++++++++++----------
.../processors/standard/TestDistributeLoad.java | 29 ++++++++++++++-
2 files changed, 51 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 8c1f9bd..57c1723 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -31,14 +31,16 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SideEffectFree;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.DynamicRelationship;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
@@ -71,6 +73,9 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
+ "<Distribution Strategy>")
+@WritesAttributes(
+ @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
+)
public class DistributeLoad extends AbstractProcessor {
public static final String STRATEGY_ROUND_ROBIN = "round robin";
@@ -93,31 +98,26 @@ public class DistributeLoad extends AbstractProcessor {
.allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
.defaultValue(STRATEGY_ROUND_ROBIN)
.build();
-
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
.description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
.required(true)
- .addValidator(new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
- if (null == input) {
+ .addValidator((subject, input, context) -> {
+ ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
+ if (null == input) {
+ result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
+ .explanation("Need to specify delimited list of FQDNs").build();
+ return result;
+ }
+ String[] hostNames = input.split("(?:,+|;+|\\s+)");
+ for (String hostName : hostNames) {
+ if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need to specify delimited list of FQDNs").build();
+ .explanation("Need a FQDN rather than a simple host name.").build();
return result;
}
- String[] hostNames = input.split("(?:,+|;+|\\s+)");
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
- result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need a FQDN rather than a simple host name.").build();
- return result;
- }
- }
- return result;
}
+ return result;
}).build();
public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
.name("Load Distribution Service ID")
@@ -125,6 +125,7 @@ public class DistributeLoad extends AbstractProcessor {
.required(true)
.identifiesControllerService(LoadDistributionService.class)
.build();
+ public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
@@ -365,6 +366,9 @@ public class DistributeLoad extends AbstractProcessor {
return;
}
+ // add an attribute capturing which relationship a flowfile was routed through
+ session.putAttribute(flowFile, RELATIONSHIP_ATTRIBUTE, relationship.getName());
+
session.transfer(flowFile, relationship);
session.getProvenanceReporter().route(flowFile, relationship);
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 1965314..975858a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@ -16,9 +16,13 @@
*/
package org.apache.nifi.processors.standard;
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -135,4 +139,27 @@ public class TestDistributeLoad {
testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
}
}
+
+ @Test
+ public void testFlowFileAttributesAdded() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+ testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY, DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+
+ for (int i = 0; i < 100; i++) {
+ testRunner.enqueue(new byte[0]);
+ }
+
+ testRunner.run(101);
+ testRunner.assertQueueEmpty();
+
+ for (int i = 1; i <= 100; i++) {
+ testRunner.assertTransferCount(String.valueOf(i), 1);
+ final List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(String.valueOf(i));
+ assertEquals(1, flowFilesForRelationship.size());
+ final MockFlowFile mockFlowFile = flowFilesForRelationship.get(0);
+ assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
+ }
+ }
}