You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2023/02/14 00:19:27 UTC
[nifi] branch main updated: NIFI-10950 DistributeLoad processor - this closes #6924. removed Load Distribution Service
This is an automated email from the ASF dual-hosted git repository.
joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new dbef536ebd NIFI-10950 DistributeLoad processor - this closes #6924. removed Load Distribution Service
dbef536ebd is described below
commit dbef536ebd2b8883de93fc2e0bf7574f7cc92006
Author: Nissim Shiman <ns...@yahoo.com>
AuthorDate: Fri Jan 20 20:06:42 2023 +0000
NIFI-10950 DistributeLoad processor - this closes #6924. removed Load Distribution
Service
Signed-off-by: Joe Witt <jo...@apache.org>
---
.../nifi-standard-processors/pom.xml | 5 -
.../nifi/processors/standard/DistributeLoad.java | 197 ++-------------------
.../nifi-load-distribution-service-api/pom.xml | 30 ----
.../nifi/loading/LoadDistributionListener.java | 24 ---
.../nifi/loading/LoadDistributionService.java | 33 ----
.../nifi-standard-services-api-nar/pom.xml | 5 -
nifi-nar-bundles/nifi-standard-services/pom.xml | 1 -
nifi-nar-bundles/pom.xml | 6 -
8 files changed, 12 insertions(+), 289 deletions(-)
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index b6a751eabf..51d63fd09f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -265,11 +265,6 @@
<artifactId>nifi-socket-utils</artifactId>
<version>2.0.0-SNAPSHOT</version>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-load-distribution-service-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
index 1009846373..fd4ac28b4d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/DistributeLoad.java
@@ -17,19 +17,16 @@
package org.apache.nifi.processors.standard;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
-import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.DefaultRunDuration;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.DynamicRelationship;
@@ -46,13 +43,10 @@ import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.loading.LoadDistributionListener;
-import org.apache.nifi.loading.LoadDistributionService;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -82,15 +76,12 @@ public class DistributeLoad extends AbstractProcessor {
public static final String ROUND_ROBIN = "round robin";
public static final String NEXT_AVAILABLE = "next available";
- public static final String LOAD_DISTRIBUTION_SERVICE = "load distribution service";
public static final String OVERFLOW = "overflow";
public static final AllowableValue STRATEGY_ROUND_ROBIN = new AllowableValue(ROUND_ROBIN, ROUND_ROBIN,
"Relationship selection is evenly distributed in a round robin fashion; all relationships must be available.");
public static final AllowableValue STRATEGY_NEXT_AVAILABLE = new AllowableValue(NEXT_AVAILABLE, NEXT_AVAILABLE,
"Relationship selection is distributed across all available relationships in order of their weight; at least one relationship must be available.");
- public static final AllowableValue STRATEGY_LOAD_DISTRIBUTION_SERVICE = new AllowableValue(LOAD_DISTRIBUTION_SERVICE, LOAD_DISTRIBUTION_SERVICE,
- "Relationship selection is distributed by supplied LoadDistributionService Controller Service; at least one relationship must be available.");
public static final AllowableValue STRATEGY_OVERFLOW = new AllowableValue(OVERFLOW, OVERFLOW,
"Relationship selection is the first available relationship without further distribution among all relationships; at least one relationship must be available.");
@@ -107,44 +98,15 @@ public class DistributeLoad extends AbstractProcessor {
.name("Distribution Strategy")
.description("Determines how the load will be distributed. Relationship weight is in numeric order where '1' has the greatest weight.")
.required(true)
- .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_LOAD_DISTRIBUTION_SERVICE, STRATEGY_OVERFLOW)
+ .allowableValues(STRATEGY_ROUND_ROBIN, STRATEGY_NEXT_AVAILABLE, STRATEGY_OVERFLOW)
.defaultValue(ROUND_ROBIN)
.build();
- public static final PropertyDescriptor HOSTNAMES = new PropertyDescriptor.Builder()
- .name("Hostnames")
- .description("List of remote servers to distribute across. Each server must be FQDN and use either ',', ';', or [space] as a delimiter")
- .required(true)
- .addValidator((subject, input, context) -> {
- ValidationResult result = new ValidationResult.Builder().subject(subject).valid(true).input(input).explanation("Good FQDNs").build();
- if (null == input) {
- result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need to specify delimited list of FQDNs").build();
- return result;
- }
- String[] hostNames = input.split("(?:,+|;+|\\s+)");
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName) && !hostName.contains(".")) {
- result = new ValidationResult.Builder().subject(subject).input(input).valid(false)
- .explanation("Need a FQDN rather than a simple host name.").build();
- return result;
- }
- }
- return result;
- }).build();
- public static final PropertyDescriptor LOAD_DISTRIBUTION_SERVICE_TEMPLATE = new PropertyDescriptor.Builder()
- .name("Load Distribution Service ID")
- .description("The identifier of the Load Distribution Service")
- .required(true)
- .identifiesControllerService(LoadDistributionService.class)
- .build();
public static final String RELATIONSHIP_ATTRIBUTE = "distribute.load.relationship";
private List<PropertyDescriptor> properties;
private final AtomicReference<Set<Relationship>> relationshipsRef = new AtomicReference<>();
private final AtomicReference<DistributionStrategy> strategyRef = new AtomicReference<>(new RoundRobinStrategy());
private final AtomicReference<List<Relationship>> weightedRelationshipListRef = new AtomicReference<>();
- private final AtomicBoolean doCustomValidate = new AtomicBoolean(false);
- private volatile LoadDistributionListener myListener;
private final AtomicBoolean doSetProps = new AtomicBoolean(true);
@Override
@@ -185,9 +147,6 @@ public class DistributeLoad extends AbstractProcessor {
case NEXT_AVAILABLE:
strategyRef.set(new NextAvailableStrategy());
break;
- case LOAD_DISTRIBUTION_SERVICE:
- strategyRef.set(new LoadDistributionStrategy());
- break;
case OVERFLOW:
strategyRef.set(new OverflowStrategy());
break;
@@ -195,18 +154,12 @@ public class DistributeLoad extends AbstractProcessor {
throw new IllegalStateException("Invalid distribution strategy");
}
doSetProps.set(true);
- doCustomValidate.set(true);
}
}
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- if (strategyRef.get() instanceof LoadDistributionStrategy && doSetProps.getAndSet(false)) {
- final List<PropertyDescriptor> props = new ArrayList<>(properties);
- props.add(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
- props.add(HOSTNAMES);
- this.properties = Collections.unmodifiableList(props);
- } else if (doSetProps.getAndSet(false)) {
+ if (doSetProps.getAndSet(false)) {
final List<PropertyDescriptor> props = new ArrayList<>();
props.add(NUM_RELATIONSHIPS);
props.add(DISTRIBUTION_STRATEGY);
@@ -235,113 +188,22 @@ public class DistributeLoad extends AbstractProcessor {
.name(propertyDescriptorName).dynamic(true).build();
}
- @Override
- protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
- Collection<ValidationResult> results = new ArrayList<>();
- if (doCustomValidate.getAndSet(false)) {
- String distStrat = validationContext.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
- // make sure Hostnames and Controller service are set
- PropertyValue propDesc = validationContext.getProperty(HOSTNAMES);
- if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
- results.add(new ValidationResult.Builder().subject(HOSTNAMES.getName())
- .explanation("Must specify Hostnames when using 'Load Distribution Strategy'").valid(false).build());
- }
- propDesc = validationContext.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE);
- if (null == propDesc || null == propDesc.getValue() || propDesc.getValue().isEmpty()) {
- results.add(new ValidationResult.Builder()
- .subject(LOAD_DISTRIBUTION_SERVICE_TEMPLATE.getName())
- .explanation("Must specify 'Load Distribution Service ID' when using 'Load Distribution Service' strategy")
- .valid(false).build());
- }
- if (results.isEmpty()) {
- int numRels = validationContext.getProperty(NUM_RELATIONSHIPS).asInteger();
- String hostNamesValue = validationContext.getProperty(HOSTNAMES).getValue();
- String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
- int numHosts = 0;
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName)) {
- hostNames[numHosts++] = hostName;
- }
- }
- if (numHosts > numRels) {
- results.add(new ValidationResult.Builder()
- .subject("Number of Relationships and Hostnames")
- .explanation("Number of Relationships must be equal to, or greater than, the number of host names")
- .valid(false).build());
- } else {
- // create new relationships with descriptions of hostname
- Set<Relationship> relsWithDesc = new TreeSet<>();
- for (int i = 0; i < numHosts; i++) {
- relsWithDesc.add(new Relationship.Builder().name(String.valueOf(i + 1))
- .description(hostNames[i]).build());
- }
- // add add'l rels if configuration requires it...it probably shouldn't
- for (int i = numHosts + 1; i <= numRels; i++) {
- relsWithDesc.add(createRelationship(i));
- }
- relationshipsRef.set(Collections.unmodifiableSet(relsWithDesc));
- }
- }
- }
- }
- return results;
- }
-
@OnScheduled
public void createWeightedList(final ProcessContext context) {
final Map<Integer, Integer> weightings = new LinkedHashMap<>();
- String distStrat = context.getProperty(DISTRIBUTION_STRATEGY).getValue();
- if (distStrat.equals(LOAD_DISTRIBUTION_SERVICE)) {
- String hostNamesValue = context.getProperty(HOSTNAMES).getValue();
- String[] hostNames = hostNamesValue.split("(?:,+|;+|\\s+)");
- Set<String> hostNameSet = new HashSet<>();
- for (String hostName : hostNames) {
- if (StringUtils.isNotBlank(hostName)) {
- hostNameSet.add(hostName);
- }
- }
- LoadDistributionService svc = context.getProperty(LOAD_DISTRIBUTION_SERVICE_TEMPLATE).asControllerService(LoadDistributionService.class);
- myListener = new LoadDistributionListener() {
-
- @Override
- public void update(Map<String, Integer> loadInfo) {
- for (Relationship rel : relationshipsRef.get()) {
- String hostname = rel.getDescription();
- Integer weight = 1;
- if (loadInfo.containsKey(hostname)) {
- weight = loadInfo.get(hostname);
- }
- weightings.put(Integer.decode(rel.getName()), weight);
- }
- updateWeightedRelationships(weightings);
- }
- };
-
- Map<String, Integer> loadInfo = svc.getLoadDistribution(hostNameSet, myListener);
- for (Relationship rel : relationshipsRef.get()) {
- String hostname = rel.getDescription();
- Integer weight = 1;
- if (loadInfo.containsKey(hostname)) {
- weight = loadInfo.get(hostname);
- }
- weightings.put(Integer.decode(rel.getName()), weight);
- }
-
- } else {
- final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
- for (int i = 1; i <= numRelationships; i++) {
- weightings.put(i, 1);
- }
- for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
- if (!this.properties.contains(propDesc)) {
- final int relationship = Integer.parseInt(propDesc.getName());
- final int weighting = context.getProperty(propDesc).asInteger();
- weightings.put(relationship, weighting);
- }
+ final int numRelationships = context.getProperty(NUM_RELATIONSHIPS).asInteger();
+ for (int i = 1; i <= numRelationships; i++) {
+ weightings.put(i, 1);
+ }
+ for (final PropertyDescriptor propDesc : context.getProperties().keySet()) {
+ if (!this.properties.contains(propDesc)) {
+ final int relationship = Integer.parseInt(propDesc.getName());
+ final int weighting = context.getProperty(propDesc).asInteger();
+ weightings.put(relationship, weighting);
}
}
+
updateWeightedRelationships(weightings);
}
@@ -422,41 +284,6 @@ public class DistributeLoad extends AbstractProcessor {
boolean requiresAllDestinationsAvailable();
}
- private class LoadDistributionStrategy implements DistributionStrategy {
-
- private final AtomicLong counter = new AtomicLong(0L);
-
- @Override
- public Relationship mapToRelationship(final ProcessContext context, final FlowFile flowFile) {
- final List<Relationship> relationshipList = DistributeLoad.this.weightedRelationshipListRef.get();
- final int numRelationships = relationshipList.size();
-
- // create a HashSet that contains all of the available relationships, as calling #contains on HashSet
- // is much faster than calling it on a List
- boolean foundFreeRelationship = false;
- Relationship relationship = null;
-
- int attempts = 0;
- while (!foundFreeRelationship) {
- final long counterValue = counter.getAndIncrement();
- final int idx = (int) (counterValue % numRelationships);
- relationship = relationshipList.get(idx);
- foundFreeRelationship = context.getAvailableRelationships().contains(relationship);
- if (++attempts % numRelationships == 0 && !foundFreeRelationship) {
- return null;
- }
- }
-
- return relationship;
- }
-
- @Override
- public boolean requiresAllDestinationsAvailable() {
- return false;
- }
-
- }
-
private class RoundRobinStrategy implements DistributionStrategy {
private final AtomicLong counter = new AtomicLong(0L);
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml
deleted file mode 100644
index 2dcca6b49a..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/pom.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
- <!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- -->
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-standard-services</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- </parent>
- <artifactId>nifi-load-distribution-service-api</artifactId>
- <packaging>jar</packaging>
- <dependencies>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-api</artifactId>
- </dependency>
- </dependencies>
-</project>
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java
deleted file mode 100644
index 656bf99f1b..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionListener.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.loading;
-
-import java.util.Map;
-
-public interface LoadDistributionListener {
-
- public void update(Map<String, Integer> loadInfo);
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java b/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java
deleted file mode 100644
index c413975e93..0000000000
--- a/nifi-nar-bundles/nifi-standard-services/nifi-load-distribution-service-api/src/main/java/org/apache/nifi/loading/LoadDistributionService.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.nifi.loading;
-
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.nifi.controller.ControllerService;
-
-/**
- * A service that will provide a Map of Fully Qualified Domain Names (fqdn) with
- * their respective weights (scale of 1 - 100).
- */
-public interface LoadDistributionService extends ControllerService {
-
- public Map<String, Integer> getLoadDistribution(Set<String> fqdns);
-
- public Map<String, Integer> getLoadDistribution(Set<String> fqdns, LoadDistributionListener listener);
-}
diff --git a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
index 13021001fb..b672f52661 100644
--- a/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/nifi-standard-services-api-nar/pom.xml
@@ -47,11 +47,6 @@
<artifactId>nifi-distributed-cache-client-service-api</artifactId>
<scope>compile</scope>
</dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-load-distribution-service-api</artifactId>
- <scope>compile</scope>
- </dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-http-context-map-api</artifactId>
diff --git a/nifi-nar-bundles/nifi-standard-services/pom.xml b/nifi-nar-bundles/nifi-standard-services/pom.xml
index fdbb3c38ff..260da7422c 100644
--- a/nifi-nar-bundles/nifi-standard-services/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-services/pom.xml
@@ -27,7 +27,6 @@
<module>nifi-oauth2-provider-bundle</module>
<module>nifi-distributed-cache-client-service-api</module>
<module>nifi-distributed-cache-services-bundle</module>
- <module>nifi-load-distribution-service-api</module>
<module>nifi-http-context-map-api</module>
<module>nifi-lookup-service-api</module>
<module>nifi-lookup-services-bundle</module>
diff --git a/nifi-nar-bundles/pom.xml b/nifi-nar-bundles/pom.xml
index 365bda49a6..68bfaee164 100755
--- a/nifi-nar-bundles/pom.xml
+++ b/nifi-nar-bundles/pom.xml
@@ -225,12 +225,6 @@
<artifactId>nifi-security-utils-api</artifactId>
<version>2.0.0-SNAPSHOT</version>
<scope>provided</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.nifi</groupId>
- <artifactId>nifi-load-distribution-service-api</artifactId>
- <version>2.0.0-SNAPSHOT</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>