You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/01/07 19:45:32 UTC

[nifi] branch main updated: NIFI-8962: Add overflow strategy to DistributeLoad (#5267)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 42626ad  NIFI-8962: Add overflow strategy to DistributeLoad (#5267)
42626ad is described below

commit 42626adab8e4d3bf32f5addef310c927228cdd7e
Author: markobean <ma...@gmail.com>
AuthorDate: Fri Jan 7 14:45:18 2022 -0500

    NIFI-8962: Add overflow strategy to DistributeLoad (#5267)
---
 .../nifi/processors/standard/DistributeLoad.java   | 86 +++++++++++++++++-----
 .../processors/standard/TestDistributeLoad.java    | 46 +++++++++++-
 2 files changed, 112 insertions(+), 20 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 57c1723..d82ea5f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -43,6 +43,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -68,19 +69,31 @@ import org.apache.nifi.processor.util.StandardValidators;
         + "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
         + "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
         + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
-@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a "
+@DynamicProperty(name = "The relationship name (positive number)", value = "The relationship Weight (positive number)", description = "Adding a "
         + "property with the name '5' and value '10' means that the relationship with name "
-        + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+        + "'5' will receive 10 FlowFiles in each iteration instead of 1.")
 @DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
         + "<Distribution Strategy>")
 @WritesAttributes(
-        @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
+        @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the FlowFile has been routed through")
 )
 public class DistributeLoad extends AbstractProcessor {
 
-    public static final String STRATEGY_ROUND_ROBIN = "round robin";
-    public static final String STRATEGY_NEXT_AVAILABLE = "next available";
-    public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+    public static final String ROUND_ROBIN = "round robin";
+    public static final String NEXT_AVAILABLE = "next available";
+    public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+    public static final String OVERFLOW = "overflow";
+
+    public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
+            "Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
+    public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
+            "Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
+    public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
+            "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
+    public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
+            "Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
+
+
 
     public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
             .name("Number of Relationships")
@@ -91,12 +104,10 @@ public class DistributeLoad extends AbstractProcessor {
             .build();
     public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
             .name("Distribution Strategy")
-            .description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all "
-                    + "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 "
-                    + "destination can accept FlowFiles.")
+            .description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
             .required(true)
-            .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
-            .defaultValue(STRATEGY_ROUND_ROBIN)
+            .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW)
+            .defaultValue(ROUND_ROBIN)
             .build();
     public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
             .name("Hostnames")
@@ -129,7 +140,7 @@ public class DistributeLoad extends AbstractProcessor {
 
     private List<PropertyDescriptor> properties;
     private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
-    private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
+    private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy());
     private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
     private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
     private volatile LoadDistributionListener myListener;
@@ -167,14 +178,20 @@ public class DistributeLoad extends AbstractProcessor {
             this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
         } else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
             switch (newValue.toLowerCase()) {
-                case STRATEGY_ROUND_ROBIN:
+                case ROUND_ROBIN:
                     strategyRef.set(new RoundRobinStrategy());
                     break;
-                case STRATEGY_NEXT_AVAILABLE:
+                case NEXT_AVAILABLE:
                     strategyRef.set(new NextAvailableStrategy());
                     break;
-                case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
+                case LOAD_DISTRIBUTION_SERVICE:
                     strategyRef.set(new LoadDistributionStrategy());
+                    break;
+                case OVERFLOW:
+                    strategyRef.set(new OverflowStrategy());
+                    break;
+                default:
+                    throw new IllegalStateException("Invalid distribution strategy");
             }
             doSetProps.set(true);
             doCustomValidate.set(true);
@@ -222,7 +239,7 @@ public class DistributeLoad extends AbstractProcessor {
         Collection<ValidationResult> results = new ArrayList<>();
         if (doCustomValidate.getAndSet(false)) {
             String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
-            if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+            if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
                 // make sure Hostnames and Controller service are set
                 PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
                 if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
@@ -233,7 +250,7 @@ public class DistributeLoad extends AbstractProcessor {
                 if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
                     results.add(new ValidationResult.Builder()
                             .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
-                            .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
+                            .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
                             .valid(false).build());
                 }
                 if (results.isEmpty()) {
@@ -275,7 +292,7 @@ public class DistributeLoad extends AbstractProcessor {
         final Map<Integer, Integer> weightings = new LinkedHashMap<>();
 
         String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
-        if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+        if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
             String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
             String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
             Set<String> hostNameSet = new HashSet<>();
@@ -392,7 +409,7 @@ public class DistributeLoad extends AbstractProcessor {
     /**
      * Implementations must be thread-safe.
      */
-    private static interface DistributionStrategy {
+    private interface DistributionStrategy {
 
         /**
          * @param context context
@@ -491,4 +508,35 @@ public class DistributeLoad extends AbstractProcessor {
             return false;
         }
     }
+
+    private class OverflowStrategy implements DistributionStrategy {
+
+        @Override
+        public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+            final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+            final int numRelationships = relationshipList.size();
+
+            boolean foundFreeRelationship = false;
+            Relationship relationship = null;
+            // Getting set of available relationships only once. This may miss a relationship that recently became available, but
+            // overall is more efficient than re-calling for every relationship evaluation
+            Set<Relationship> availableRelationships = context.getAvailableRelationships();
+
+            int weightedIndex = 0;
+            while (!foundFreeRelationship) {
+                relationship = relationshipList.get(weightedIndex);
+                foundFreeRelationship = availableRelationships.contains(relationship);
+                if (++weightedIndex % numRelationships == 0 && !foundFreeRelationship) {
+                    return null;
+                }
+            }
+
+            return relationship;
+        }
+
+        @Override
+        public boolean requiresAllDestinationsAvailable() {
+            return false;
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 975858a..8de24e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@ -123,7 +123,7 @@ public class TestDistributeLoad {
         final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
 
         testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
-        testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+        testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE.getValue());
 
         for (int i = 0; i < 99; i++) {
             testRunner.enqueue(new byte[0]);
@@ -162,4 +162,48 @@ public class TestDistributeLoad {
             assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
         }
     }
+
+    @Test
+    public void testOverflow() {
+        final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+
+        testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "3");
+        testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_OVERFLOW.getValue());
+
+        // Queue all FlowFiles required for this test
+        for (int i = 0; i < 8; i++) {
+            testRunner.enqueue(new byte[0]);
+        }
+
+        // Repeatedly send to highest weighted relationship as long as it is available
+        testRunner.run(2, false);
+        testRunner.assertTransferCount("1", 2);
+        testRunner.assertTransferCount("2", 0);
+        testRunner.assertTransferCount("3", 0);
+
+        // When highest weighted relationship becomes unavailable, repeatedly send to next-highest weighted relationship
+        testRunner.clearTransferState();
+        testRunner.setRelationshipUnavailable("1");
+        testRunner.run(2, false);
+        testRunner.assertTransferCount("1", 0);
+        testRunner.assertTransferCount("2", 2);
+        testRunner.assertTransferCount("3", 0);
+
+        // Return to highest weighted relationship when it becomes available again
+        testRunner.clearTransferState();
+        testRunner.setRelationshipAvailable("1");
+        testRunner.run(2, false);
+        testRunner.assertTransferCount("1", 2);
+        testRunner.assertTransferCount("2", 0);
+        testRunner.assertTransferCount("3", 0);
+
+        // Skip ahead and repeatedly send to the first available relationship when multiple relationships are unavailable
+        testRunner.clearTransferState();
+        testRunner.setRelationshipUnavailable("1");
+        testRunner.setRelationshipUnavailable("2");
+        testRunner.run(2, false);
+        testRunner.assertTransferCount("1", 0);
+        testRunner.assertTransferCount("2", 0);
+        testRunner.assertTransferCount("3", 2);
+    }
 }