You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/01/14 21:15:14 UTC

[nifi] branch master updated: NIFI-6919: Added relationship attribute to DistributeLoad

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 421bfdd  NIFI-6919: Added relationship attribute to DistributeLoad
421bfdd is described below

commit 421bfdd5fff87e477a91450f8414d7a989bd083f
Author: Michael Hogue <ho...@gmail.com>
AuthorDate: Thu Dec 12 21:06:15 2019 -0500

    NIFI-6919: Added relationship attribute to DistributeLoad
    
    NIFI-6919: Cleaned up docs
    
    NIFI-6919: Cleanup
    
    NIFI-6919: Cleanup
    
    NIFI-6919: added negative unit test
    
    NIFI-6919: Removed unnecesary feature flag
    
    Updated attribute description
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3939
---
 .../nifi/processors/standard/DistributeLoad.java   | 42 ++++++++++++----------
 .../processors/standard/TestDistributeLoad.java    | 29 ++++++++++++++-
 2 files changed, 51 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 8c1f9bd..57c1723 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -31,14 +31,16 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
+import org.apache.nifi.annotation.behavior.DynamicRelationship;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.TriggerWhenAnyDestinationAvailable;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.behavior.DynamicRelationship;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
@@ -71,6 +73,9 @@ import org.apache.nifi.processor.util.StandardValidators;
         + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
 @DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
         + "<Distribution Strategy>")
+@WritesAttributes(
+        @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
+)
 public class DistributeLoad extends AbstractProcessor {
 
     public static final String STRATEGY_ROUND_ROBIN = "round robin";
@@ -93,31 +98,26 @@ public class DistributeLoad extends AbstractProcessor {
             .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
             .defaultValue(STRATEGY_ROUND_ROBIN)
             .build();
-
     public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
             .name("Hostnames")
             .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
             .required(true)
-            .addValidator(new Validator() {
-
-                @Override
-                public ValidationResult validate(String subject, String input, ValidationContext context) {
-                    ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
-                    if (null == input) {
+            .addValidator((subject, input, context) -> {
+                ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
+                if (null == input) {
+                    result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
+                    .explanation("Need to specify delimited list of FQDNs").build();
+                    return result;
+                }
+                String[] hostNames = input.split("(?:,+|;+|\\s+)");
+                for (String hostName : hostNames) {
+                    if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
                         result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
-                        .explanation("Need to specify delimited list of FQDNs").build();
+                        .explanation("Need a FQDN rather than a simple host name.").build();
                         return result;
                     }
-                    String[] hostNames = input.split("(?:,+|;+|\\s+)");
-                    for (String hostName : hostNames) {
-                        if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
-                            result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
-                            .explanation("Need a FQDN rather than a simple host name.").build();
-                            return result;
-                        }
-                    }
-                    return result;
                 }
+                return result;
             }).build();
     public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
             .name("Load Distribution Service ID")
@@ -125,6 +125,7 @@ public class DistributeLoad extends AbstractProcessor {
             .required(true)
             .identifiesControllerService(LoadDistributionService.class)
             .build();
+    public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
 
     private List<PropertyDescriptor> properties;
     private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
@@ -365,6 +366,9 @@ public class DistributeLoad extends AbstractProcessor {
             return;
         }
 
+        // add an attribute capturing which relationship a flowfile was routed through
+        session.putAttribute(flowFile, RELATIONSHIP_ATTRIBUTE, relationship.getName());
+
         session.transfer(flowFile, relationship);
         session.getProvenanceReporter().route(flowFile, relationship);
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 1965314..975858a 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@ -16,9 +16,13 @@
  */
 package org.apache.nifi.processors.standard;
 
+import static org.junit.Assert.assertEquals;
+
+import java.util.List;
+
+import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -135,4 +139,27 @@ public class TestDistributeLoad {
             testRunner.assertTransferCount(String.valueOf(i), (i == 50) ? 0 : 1);
         }
     }
+
+    @Test
+    public void testFlowFileAttributesAdded() {
+        final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+
+        testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS, "100");
+        testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY, DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+
+        for (int i = 0; i < 100; i++) {
+            testRunner.enqueue(new byte[0]);
+        }
+
+        testRunner.run(101);
+        testRunner.assertQueueEmpty();
+
+        for (int i = 1; i <= 100; i++) {
+            testRunner.assertTransferCount(String.valueOf(i), 1);
+            final List<MockFlowFile> flowFilesForRelationship = testRunner.getFlowFilesForRelationship(String.valueOf(i));
+            assertEquals(1, flowFilesForRelationship.size());
+            final MockFlowFile mockFlowFile = flowFilesForRelationship.get(0);
+            assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
+        }
+    }
 }