You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ak...@apache.org on 2012/02/28 16:33:10 UTC
svn commit: r1294693 -
/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
Author: akarpe
Date: Tue Feb 28 15:33:10 2012
New Revision: 1294693
URL: http://svn.apache.org/viewvc?rev=1294693&view=rev
Log:
Fixed CAMEL-5039 Make WeightedRandomLoadBalancer really random. Many Thanks to Xavier Fournet for the bug identification, submission of this patch and well documented test cases.
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java?rev=1294693&r1=1294692&r2=1294693&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/loadbalancer/WeightedRandomLoadBalancer.java Tue Feb 28 15:33:10 2012
@@ -23,34 +23,49 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
public class WeightedRandomLoadBalancer extends WeightedLoadBalancer {
- private int randomCounter;
+ private final Random rnd = new Random();
+ private final int distributionRatioSum;
+ private int runtimeRatioSum;
public WeightedRandomLoadBalancer(List<Integer> distributionRatioList) {
super(distributionRatioList);
+ int sum = 0;
+ for (Integer distributionRatio : distributionRatioList) {
+ sum += distributionRatio;
+ }
+ distributionRatioSum = sum;
+ runtimeRatioSum = distributionRatioSum;
}
@Override
- protected Processor chooseProcessor(List<Processor> processors, Exchange exchange) {
- boolean found = false;
- while (!found) {
- if (getRuntimeRatios().isEmpty()) {
- loadRuntimeRatios(getDistributionRatioList());
+ protected Processor chooseProcessor(List<Processor> processors, Exchange exchange) {
+ int selectedProcessorIndex = selectProcessIndex();
+ return processors.get(selectedProcessorIndex);
+ }
+
+ public int selectProcessIndex() {
+ if (runtimeRatioSum == 0) { // every processor is exhausted, reload for a new distribution round
+ for (DistributionRatio distributionRatio : getRuntimeRatios()) {
+ int weight = distributionRatio.getDistributionWeight();
+ distributionRatio.setRuntimeWeight(weight);
}
-
- randomCounter = 0;
- if (getRuntimeRatios().size() > 0) {
- randomCounter = new Random().nextInt(getRuntimeRatios().size());
- }
-
- if (getRuntimeRatios().get(randomCounter).getRuntimeWeight() > 0) {
- getRuntimeRatios().get(randomCounter).setRuntimeWeight((getRuntimeRatios().get(randomCounter).getRuntimeWeight()) - 1);
- found = true;
- } else {
- getRuntimeRatios().remove(randomCounter);
+ runtimeRatioSum = distributionRatioSum;
+ }
+
+ DistributionRatio selected = null;
+ int randomWeight = rnd.nextInt(runtimeRatioSum);
+ int choiceWeight = 0;
+ for (DistributionRatio distributionRatio : getRuntimeRatios()) {
+ choiceWeight += distributionRatio.getRuntimeWeight();
+ if (randomWeight < choiceWeight) {
+ selected = distributionRatio;
+ break;
}
}
+
+ selected.setRuntimeWeight(selected.getRuntimeWeight() - 1);
+ runtimeRatioSum--;
- return processors.get(getRuntimeRatios().get(randomCounter).getProcessorPosition());
+ return selected.getProcessorPosition();
}
-
}