You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2022/01/07 19:45:32 UTC
[nifi] branch main updated: NIFI-8962: Add overflow strategy to DistributeLoad (#5267)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 42626ad NIFI-8962: Add overflow strategy to DistributeLoad (#5267)
42626ad is described below
commit 42626adab8e4d3bf32f5addef310c927228cdd7e
Author: markobean <ma...@gmail.com>
AuthorDate: Fri Jan 7 14:45:18 2022 -0500
NIFI-8962: Add overflow strategy to DistributeLoad (#5267)
---
.../nifi/processors/standard/DistributeLoad.java | 86 +++++++++++++++++-----
.../processors/standard/TestDistributeLoad.java | 46 +++++++++++-
2 files changed, 112 insertions(+), 20 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 57c1723..d82ea5f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -43,6 +43,7 @@ import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -68,19 +69,31 @@ import org.apache.nifi.processor.util.StandardValidators;
+ "strategy, the default is to assign each destination a weighting of 1 (evenly distributed). However, optional properties"
+ "can be added to the change this; adding a property with the name '5' and value '10' means that the relationship with name "
+ "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
-@DynamicProperty(name = "The relationship name(positive number)", value = "The relationship Weight(positive number)", description = "adding a "
+@DynamicProperty(name = "The relationship name (positive number)", value = "The relationship Weight (positive number)", description = "Adding a "
+ "property with the name '5' and value '10' means that the relationship with name "
- + "'5' will be receive 10 FlowFiles in each iteration instead of 1.")
+ + "'5' will receive 10 FlowFiles in each iteration instead of 1.")
@DynamicRelationship(name = "A number 1..<Number Of Relationships>", description = "FlowFiles are sent to this relationship per the "
+ "<Distribution Strategy>")
@WritesAttributes(
- @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the flow file has been routed through")
+ @WritesAttribute(attribute = "distribute.load.relationship", description = "The name of the specific relationship the FlowFile has been routed through")
)
public class DistributeLoad extends AbstractProcessor {
- public static final String STRATEGY_ROUND_ROBIN = "round robin";
- public static final String STRATEGY_NEXT_AVAILABLE = "next available";
- public static final String STRATEGY_LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+ public static final String ROUND_ROBIN = "round robin";
+ public static final String NEXT_AVAILABLE = "next available";
+ public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
+ public static final String OVERFLOW = "overflow";
+
+ public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
+ "Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
+ public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
+ "Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
+ public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
+ "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
+ public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
+ "Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
+
+
public static final PropertyDescriptor NUM_RELATIONSHIPS = new PropertyDescriptor.Builder()
.name("Number of Relationships")
@@ -91,12 +104,10 @@ public class DistributeLoad extends AbstractProcessor {
.build();
public static final PropertyDescriptor DISTRIBUTION_STRATEGY = new PropertyDescriptor.Builder()
.name("Distribution Strategy")
- .description("Determines how the load will be distributed. If using Round Robin, will not distribute any FlowFiles unless all "
- + "destinations can accept FlowFiles; when using Next Available, will distribute FlowFiles as long as at least 1 "
- + "destination can accept FlowFiles.")
+ .description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
.required(true)
- .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE)
- .defaultValue(STRATEGY_ROUND_ROBIN)
+ .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW)
+ .defaultValue(ROUND_ROBIN)
.build();
public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
.name("Hostnames")
@@ -129,7 +140,7 @@ public class DistributeLoad extends AbstractProcessor {
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
- private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<DistributionStrategy>(new RoundRobinStrategy());
+ private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy());
private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
private volatile LoadDistributionListener myListener;
@@ -167,14 +178,20 @@ public class DistributeLoad extends AbstractProcessor {
this.relationshipsRef.set(Collections.unmodifiableSet(relationships));
} else if (descriptor.equals(DISTRIBUTION_STRATEGY)) {
switch (newValue.toLowerCase()) {
- case STRATEGY_ROUND_ROBIN:
+ case ROUND_ROBIN:
strategyRef.set(new RoundRobinStrategy());
break;
- case STRATEGY_NEXT_AVAILABLE:
+ case NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy());
break;
- case STRATEGY_LOAD_DISTRIBUTION_SERVICE:
+ case LOAD_DISTRIBUTION_SERVICE:
strategyRef.set(new LoadDistributionStrategy());
+ break;
+ case OVERFLOW:
+ strategyRef.set(new OverflowStrategy());
+ break;
+ default:
+ throw new IllegalStateException("Invalid distribution strategy");
}
doSetProps.set(true);
doCustomValidate.set(true);
@@ -222,7 +239,7 @@ public class DistributeLoad extends AbstractProcessor {
Collection<ValidationResult> results = new ArrayList<>();
if (doCustomValidate.getAndSet(false)) {
String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+ if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
// make sure Hostnames and Controller service are set
PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
@@ -233,7 +250,7 @@ public class DistributeLoad extends AbstractProcessor {
if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
results.add(new ValidationResult.Builder()
.subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
- .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Strategy'")
+ .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
.valid(false).build());
}
if (results.isEmpty()) {
@@ -275,7 +292,7 @@ public class DistributeLoad extends AbstractProcessor {
final Map<Integer, Integer> weightings = new LinkedHashMap<>();
String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(STRATEGY_LOAD_DISTRIBUTION_SERVICE)) {
+ if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
Set<String> hostNameSet = new HashSet<>();
@@ -392,7 +409,7 @@ public class DistributeLoad extends AbstractProcessor {
/**
* Implementations must be thread-safe.
*/
- private static interface DistributionStrategy {
+ private interface DistributionStrategy {
/**
* @param context context
@@ -491,4 +508,35 @@ public class DistributeLoad extends AbstractProcessor {
return false;
}
}
+
+ private class OverflowStrategy implements DistributionStrategy {
+
+ @Override
+ public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
+ final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
+ final int numRelationships = relationshipList.size();
+
+ boolean foundFreeRelationship = false;
+ Relationship relationship = null;
+ // Getting set of available relationships only once. This may miss a relationship that recently became available, but
+ // overall is more efficient than re-calling for every relationship evaluation
+ Set<Relationship> availableRelationships = context.getAvailableRelationships();
+
+ int weightedIndex = 0;
+ while (!foundFreeRelationship) {
+ relationship = relationshipList.get(weightedIndex);
+ foundFreeRelationship = availableRelationships.contains(relationship);
+ if (++weightedIndex % numRelationships == 0 && !foundFreeRelationship) {
+ return null;
+ }
+ }
+
+ return relationship;
+ }
+
+ @Override
+ public boolean requiresAllDestinationsAvailable() {
+ return false;
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
index 975858a..8de24e9 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestDistributeLoad.java
@@ -123,7 +123,7 @@ public class TestDistributeLoad {
final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "100");
- testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE);
+ testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_NEXT_AVAILABLE.getValue());
for (int i = 0; i < 99; i++) {
testRunner.enqueue(new byte[0]);
@@ -162,4 +162,48 @@ public class TestDistributeLoad {
assertEquals(String.valueOf(i), mockFlowFile.getAttribute(DistributeLoad.RELATIONSHIP_ATTRIBUTE));
}
}
+
+ @Test
+ public void testOverflow() {
+ final TestRunner testRunner = TestRunners.newTestRunner(new DistributeLoad());
+
+ testRunner.setProperty(DistributeLoad.NUM_RELATIONSHIPS.getName(), "3");
+ testRunner.setProperty(DistributeLoad.DISTRIBUTION_STRATEGY.getName(), DistributeLoad.STRATEGY_OVERFLOW.getValue());
+
+ // Queue all FlowFiles required for this test
+ for (int i = 0; i < 8; i++) {
+ testRunner.enqueue(new byte[0]);
+ }
+
+ // Repeatedly send to highest weighted relationship as long as it is available
+ testRunner.run(2, false);
+ testRunner.assertTransferCount("1", 2);
+ testRunner.assertTransferCount("2", 0);
+ testRunner.assertTransferCount("3", 0);
+
+ // When highest weighted relationship becomes unavailable, repeatedly send to next-highest weighted relationship
+ testRunner.clearTransferState();
+ testRunner.setRelationshipUnavailable("1");
+ testRunner.run(2, false);
+ testRunner.assertTransferCount("1", 0);
+ testRunner.assertTransferCount("2", 2);
+ testRunner.assertTransferCount("3", 0);
+
+ // Return to highest weighted relationship when it becomes available again
+ testRunner.clearTransferState();
+ testRunner.setRelationshipAvailable("1");
+ testRunner.run(2, false);
+ testRunner.assertTransferCount("1", 2);
+ testRunner.assertTransferCount("2", 0);
+ testRunner.assertTransferCount("3", 0);
+
+ // Skip ahead and repeatedly send to the first available relationship when multiple relationships are unavailable
+ testRunner.clearTransferState();
+ testRunner.setRelationshipUnavailable("1");
+ testRunner.setRelationshipUnavailable("2");
+ testRunner.run(2, false);
+ testRunner.assertTransferCount("1", 0);
+ testRunner.assertTransferCount("2", 0);
+ testRunner.assertTransferCount("3", 2);
+ }
}